Joël Luijmes
11/02/2020, 4:39 PMrun_until_complete
on the loop right? But can I also use the async initializers? __aenter__
Joël Luijmes
11/02/2020, 4:58 PMclass AsyncTask(Task):
def run(self):
return asyncio.run(self._bootstrap_run_async())
@abc.abstractclassmethod
async def run_async(self):
raise NotImplementedError('Abstract method')
async def _bootstrap_run_async(self):
if self.__aenter__:
await self.__aenter__()
hit_except = False
try:
return await self.run_async()
except Exception:
hit_except = True
if not self.__aexit__ or not await self.__aexit__(*sys.exc_info()):
raise
finally:
if self.__aexit__ and not hit_except:
await self.__aexit__(None, None, None)
Ernest
01/14/2021, 12:59 PM