Daniil Ponizov

    Daniil Ponizov

    1 year ago
    Hello! does Prefect support asynchronous tasks?
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Daniil Ponizov, I think this is a good answer
    Zach Angell

    Zach Angell

    1 year ago
    We are also working on features to support asynchronous tasks 🙂
    Daniil Ponizov

    Daniil Ponizov

    1 year ago
    how can i get event loop inside task run?
    Kevin Kho

    Kevin Kho

    1 year ago
    How would you do it without Prefect?
    get_event_loop()
    ?
    Daniil Ponizov

    Daniil Ponizov

    1 year ago
    you mean use it like this?
    @task
    def some_task():
        loop = asyncio.get_event_loop()   loop.run_until_complete(asyncio.wait(futures))   
    
    with Flow("flow_name") as fl:
        some_task()
    sorry, cant format this snippet
    when i try to hide event loop in task it doesn't see any coroutine and just finish
    Kevin Kho

    Kevin Kho

    1 year ago
    Let me ask the team if anyone if familiar with async
    emre

    emre

    1 year ago
    Hey @Daniil Ponizov I tried a couple things and had success with the following snippet.
    @task
    def async_sleeper(i):
        loop = asyncio.new_event_loop()
    
        tasks = [loop.create_task(some_coro()) for _ in range(i)]
        loop.run_until_complete(asyncio.wait(tasks))
    I don't know what is happening, But using
    loop.create_task
    instead of
    asyncio.create_task
    did the trick for me. I can execute this task in a mapped fashion even.
    DaskExecutor
    works with the above setup, but
    LocalDaskExecutor
    does not
    I recommend using
    asyncio.new_event_loop
    instead of the usual
    get_event_loop
    . event loops are single threaded by default. python gets confused when multiple threads try to access the single event loop owned by the python process. Instead, create a fresh event loop at the start of your tasks with
    new_event_loop
    . Also its probably best if you close the loop when the task finishes.