Hey <#CL09KU1K7|> Running into a frustrating issu...
# ask-community
a
Hey #CL09KU1K7 Running into a frustrating issue with async code in Prefect deployments. Code works perfectly locally but hangs indefinitely in k8s deployments. More Details in ๐Ÿงต
The Setup: I have a client library that only provides async methods for API calls: python
Copy code
class DataClient:
    async def fetch_config(self, resource_id):
        # Makes async HTTP calls to external API
        response = await self.http_client.get(f"/api/config/{resource_id}")
        return response.json()

class ConfigLoader:
    @classmethod
    def get_config(cls, resource_id):
        # This is the problematic line ๐Ÿ‘‡
        data = asyncio.run(cls._fetch_data(resource_id))
        return data.configuration
    
    @staticmethod
    async def _fetch_data(resource_id):
        client = DataClient()
        return await client.fetch_config(resource_id)
The Problem: โ€ข โœ… Local execution: Works fine when running as a script โ€ข โŒ Prefect k8s deployment: Hangs forever with
RuntimeWarning: coroutine was never awaited
Error in container logs:
Copy code
RuntimeWarning: coroutine '_fetch_data' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
I suspect this is an event loop nesting issue since Prefect probably runs its own event loop, but I need to maintain a sync interface for backwards compatibility with existing task definitions. Question: What's the best pattern for calling async client libraries from within Prefect tasks? I've tried the prefect
asyncutils
but got the same hanging behavior. Has anyone dealt with this pattern before? The client library is third-party so I can't change it to be sync. Thanks! ๐Ÿ™
cc @Greg Jeffrey
j
a
It's that I'm awaiting a coroutine inside a function called in my
@task
. The problem is nested event loops. More so looking if someone has used thread safe calls like
asyncio.run_coroutine_threadsafe
making the task
async
doesn't inherently solve my problem
a
I'd recommend doing something like this:
Copy code
import asyncio


def run_coro_as_sync(coro):
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        loop = None
   
    if loop and loop.is_running():
        return asyncio.run_coroutine_threadsafe(coro, loop).result()
    else:
        return asyncio.run(coro)


class ConfigLoader:
    @classmethod
    def get_config(cls, resource_id):
        # This is the problematic line ๐Ÿ‘‡
        data = run_coro_as_sync(cls._fetch_data(resource_id))
        return data.configuration
which should help your code run correctly in cases where there's no running event loop and cases where there is a running event loop.
a
Do I need to make my tasks or flows that call the
run_coro_as_sync
async and await?
are you saying the
run_coro_as_sync
actually executes the coro synchronously?
a
Yeah, it should make the coro execution synchronous, so you don't need to
await
the call