Morning! I have an issue with the latest prefect ...
# ask-community
w
Morning! I have an issue with the latest prefect release (2.16.0), the version 2.15.0 works fine. I have this minimal workable code async code.
Copy code
@flow
async def myflow(experiment_id: int):
    engine = create_async_engine(
        url=get_db_url(),
        connect_args={
            'statement_cache_size': 0,
            'prepared_statement_cache_size': 0,
        },
    )

    async with AsyncSession(engine) as session:
        await session.get(Experiment, experiment_id)

if __name__ == '__main__':

    loop = asyncio.run(myflow(189))
But if I use a task, I got the following error:
Copy code
RuntimeError: Task <Task pending name='Task-27' coro=<AsyncSession.close() running at /Users/william/.pyenv/versions/myenv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py:1016> cb=[shield.<locals>._inner_done_callback() at /Users/william/.pyenv/versions/3.11.5/lib/python3.11/asyncio/tasks.py:881]> got Future <Future pending cb=[Protocol._on_waiter_completed()]> attached to a different loop
Full stracktrace on thread. Should I open a github ticket?
If I execute this code with task:
Copy code
@task()
async def task_get_experiment_from_db(experiment_id: int, session: AsyncSession) -> Experiment:
    experiment = await session.get(Experiment, experiment_id)
    return experiment

@flow
async def myflow(experiment_id: int):
    engine = create_async_engine(
        url=get_db_url(),
        connect_args={
            'statement_cache_size': 0,
            'prepared_statement_cache_size': 0,
        },
    )

    async with AsyncSession(engine) as session:
        await task_get_experiment_from_db(experiment_id=experiment_id, session=session)


if __name__ == '__main__':

    loop = asyncio.run(myflow(189))
I got this error:
traceback.py
c
Hi William, at the current time we wouldn't support passing something like a database connection from a flow to a task that way. The task will generally execute on a different thread from the flow with a different event loop. We are working on some options to run tasks on the main application thread in more cases, but even with those changes, this may be unreliable for you.
thank you 1
Our recommendation would be to open new connections (in this case SA sessions) within the tasks rather than passing them to the tasks as parameters
upvote 1
thank you 1
c
@Chris Guidry I was having a similar issue but with aiohttp client. I thought the PR has been merged (at least in 2.15) but I was still experiencing the issue with 2.16. Is this change reverted in 2.16 for some reason?
c
Yes, sorry, we did revert it because it was causing hangs in some other conditions. We'll be working toward bringing that capability back but we're not quite there yet.
👍 1
c
Awesome, thanks for clarifying!
125 Views