Hey folks, I'm having some troubles running Prefect using async. I am currently using asyncpraw to query some reddit data, but I am incurring in an error that says:
RuntimeError: There is no current event loop in thread 'WorkerThread-0'
. I understood that Prefect runs its own event loop in the main thread, and flows are executed in a separate worker thread. I can't seem to figure out a work around for the issue. For the record, my code (tldr) is as such:
def get_submission_from_ids(ids):
    reddit = asyncpraw.Reddit(...)
    return <http://reddit.info|reddit.info>(fullnames=ids)

async def get_data():
    sub_gen = get_submission_from_ids([...])  # this is an async generator
    submissions = [sub async for sub in sub_gen]

if __name__ == '__main__':
For the record, if I move the code in
directly to the flow
, the run completes successfully, i get the Submission objects, but then I get this other error at the very end:
Does asyncpraw need to be used from an async method?
not necessarily: the
method returns an async generator itself, so I don't think it needs to be itself in an async method. Nevertheless, even if I set
to be async, and then await it, i still get an error
Extra piece of info: Apparently, where the
instance gets created matters. If I create it in the main flow and then pass it down to the function, it now works, the only error I have left is the one I copy pasted above.
Yeah when you create an instance they’re looking for an event loop
There isn’t one in a synchronous function, so it fails.
What error happens when
is async?
same exact one
actually no wait
in the case you mentioned, the error is different:
asyncprawcore.exceptions.RequestException: error with request Event loop is closed
That’s a weird one — are you using the
object outside of the task afterwards?
It also might be an issue with returning an async generator, I’m not sure we support that.
indirectly yes when running:
submissions = [sub async for sub in sub_gen]
But to be fair, after moving the reddit instance to the flow and passing it down to the task, the generator now works, so that fixed it. I just was left with this error here:
You’ll need to share the full traceback (and not as a screenshot preferably)
ah there isn't a lot but sure, gimme 1 minute
Here it is. The run does finish successfully, but it seems the Session doesn't get closed properly:
18:26:14.386 | INFO    | prefect.engine - Created flow run 'huge-pegasus' for flow 'Get subreddit data'
18:26:15.817 | INFO    | Flow run "r/dataengineering | from '2023-04-14' to '2023-04-15'" - Loaded result from file:  30bf6315f9485ee82b4e6d2fbe7329be.pkl
18:26:16.042 | INFO    | Flow run "r/dataengineering | from '2023-04-14' to '2023-04-15'" - Created task run 'Get submission from ids-0' for task 'Get submission from ids'
18:26:16.043 | INFO    | Flow run "r/dataengineering | from '2023-04-14' to '2023-04-15'" - Executing 'Get submission from ids-0' immediately...
18:26:17.636 | INFO    | Task run 'Get submission from ids-0' - Finished in state Completed()
18:26:18.267 | INFO    | Flow run "r/dataengineering | from '2023-04-14' to '2023-04-15'" - [Submission(id='12l9mzx'), ...]
18:26:18.533 | INFO    | Flow run 'huge-pegasus' - Finished in state Completed('All states completed.')
18:26:18.763 | ERROR   | asyncio - Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x7f7a102656d0>
18:26:18.764 | ERROR   | asyncio - Unclosed connector connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x7f7a1022f9a0>, 6351.362316427)]'] connector: <aiohttp.connector.TCPConnector object at 0x7f7a19c9dc10>
Oh those are just error logs not actual errors
yeah sorry, I should have been more precise
Anyway, looks like you’re probably supposed to context manage something?
Async things can’t auto teardown on shutdown like sync things can
I'll take a look, reading the docs of asyncpraw I didn't find much about managing context but maybe i missed something
You need to close them explicitly or context manage them
yep, you were right, i had missed exactly that
needed to call
await reddit.close()
thanks a lot for the help, i am quite new with
async IO
and am not aware of many concepts