thiago
10/23/2025, 1:25 PMNikhil Ramanan
10/24/2025, 3:55 PMasyncio.run()thiago
10/24/2025, 4:50 PMasyncio.run() is way simpler:
def run_async_threadsafe(coro: Coroutine[Any, Any, T]) -> T:
"""Run an async coroutine from sync code, handling existing event loops.
Args:
coro: A coroutine object to run
Returns:
The result of the coroutine execution
Raises:
Any exception raised by the coroutine
"""
try:
# re-use existing event loop
running_loop = asyncio.get_running_loop()
future = asyncio.run_coroutine_threadsafe(coro, running_loop)
return future.result()
except RuntimeError:
# no existing event loop, use a new one
loop = asyncio.new_event_loop()
thread = threading.Thread(target=run_event_loop, args=(loop,), daemon=True)
thread.start()
future = asyncio.run_coroutine_threadsafe(coro, loop)
return future.result()Nikhil Ramanan
10/24/2025, 4:53 PMthiago
10/24/2025, 4:57 PM