Andrew Allen
07/29/2025, 7:40 PMAndrew Allen
07/29/2025, 7:40 PMclass DataClient:
async def fetch_config(self, resource_id):
# Makes async HTTP calls to external API
response = await self.http_client.get(f"/api/config/{resource_id}")
return response.json()
class ConfigLoader:
@classmethod
def get_config(cls, resource_id):
# This is the problematic line ๐
data = asyncio.run(cls._fetch_data(resource_id))
return data.configuration
@staticmethod
async def _fetch_data(resource_id):
client = DataClient()
return await client.fetch_config(resource_id)
The Problem:
โข โ
Local execution: Works fine when running as a script
โข โ Prefect k8s deployment: Hangs forever with RuntimeWarning: coroutine was never awaited
Error in container logs:
RuntimeWarning: coroutine '_fetch_data' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
I suspect this is an event loop nesting issue since Prefect probably runs its own event loop, but I need to maintain a sync interface for backwards compatibility with existing task definitions.
Question: What's the best pattern for calling async client libraries from within Prefect tasks? I've tried the prefect asyncutils
but got the same hanging behavior.
Has anyone dealt with this pattern before? The client library is third-party so I can't change it to be sync.
Thanks! ๐Andrew Allen
07/29/2025, 7:40 PMJanet Carson
07/29/2025, 10:42 PMAndrew Allen
07/30/2025, 11:54 AM@task
. The problem is nested event loops. More so looking if someone has used thread safe calls like asyncio.run_coroutine_threadsafe
making the task async
doesn't inherently solve my problemalex
07/30/2025, 1:43 PMimport asyncio
def run_coro_as_sync(coro):
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop and loop.is_running():
return asyncio.run_coroutine_threadsafe(coro, loop).result()
else:
return asyncio.run(coro)
class ConfigLoader:
@classmethod
def get_config(cls, resource_id):
# This is the problematic line ๐
data = run_coro_as_sync(cls._fetch_data(resource_id))
return data.configuration
which should help your code run correctly in cases where there's no running event loop and cases where there is a running event loop.Andrew Allen
07/30/2025, 3:45 PMrun_coro_as_sync
async and await?Andrew Allen
07/30/2025, 3:45 PMrun_coro_as_sync
actually executes the coro synchronously?alex
07/30/2025, 3:48 PMawait
the call