Hey folks, I'm having some troubles running Prefe...
# prefect-community
f
Hey folks, I'm having some troubles running Prefect using async. I am currently using asyncpraw to query some reddit data, but I am incurring in an error that says:
RuntimeError: There is no current event loop in thread 'WorkerThread-0'
. I understood that Prefect runs its own event loop in the main thread, and flows are executed in a separate worker thread. I can't seem to figure out a work around for the issue. For the record, my code (tldr) is as such:
Copy code
@task
def get_submission_from_ids(ids):
    reddit = asyncpraw.Reddit(...)
    return <http://reddit.info|reddit.info>(fullnames=ids)

@flow
async def get_data():
    sub_gen = get_submission_from_ids([...])  # this is an async generator
    submissions = [sub async for sub in sub_gen]

if __name__ == '__main__':
    asyncio.run(get_data())
1
For the record, if I move the code in
get_submission_from_ids
directly to the flow
get_data
, the run completes successfully, i get the Submission objects, but then I get this other error at the very end:
z
Does asyncpraw need to be used from an async method?
f
not necessarily: the
info
method returns an async generator itself, so I don't think it needs to be itself in an async method. Nevertheless, even if I set
get_submission_from_ids
to be async, and then await it, i still get an error
Extra piece of info: Apparently, where the
Reddit
instance gets created matters. If I create it in the main flow and then pass it down to the function, it now works, the only error I have left is the one I copy pasted above.
z
Yeah when you create an instance they’re looking for an event loop
There isn’t one in a synchronous function, so it fails.
What error happens when
get_submission_from_ids
is async?
f
same exact one
actually no wait
sorry
in the case you mentioned, the error is different:
asyncprawcore.exceptions.RequestException: error with request Event loop is closed
z
That’s a weird one — are you using the
reddit
object outside of the task afterwards?
It also might be an issue with returning an async generator, I’m not sure we support that.
f
indirectly yes when running:
submissions = [sub async for sub in sub_gen]
But to be fair, after moving the reddit instance to the flow and passing it down to the task, the generator now works, so that fixed it. I just was left with this error here:
z
You’ll need to share the full traceback (and not as a screenshot preferably)
f
ah there isn't a lot but sure, gimme 1 minute
Here it is. The run does finish successfully, but it seems the Session doesn't get closed properly:
Copy code
18:26:14.386 | INFO    | prefect.engine - Created flow run 'huge-pegasus' for flow 'Get subreddit data'
18:26:15.817 | INFO    | Flow run "r/dataengineering | from '2023-04-14' to '2023-04-15'" - Loaded result from file:  30bf6315f9485ee82b4e6d2fbe7329be.pkl
18:26:16.042 | INFO    | Flow run "r/dataengineering | from '2023-04-14' to '2023-04-15'" - Created task run 'Get submission from ids-0' for task 'Get submission from ids'
18:26:16.043 | INFO    | Flow run "r/dataengineering | from '2023-04-14' to '2023-04-15'" - Executing 'Get submission from ids-0' immediately...
18:26:17.636 | INFO    | Task run 'Get submission from ids-0' - Finished in state Completed()
18:26:18.267 | INFO    | Flow run "r/dataengineering | from '2023-04-14' to '2023-04-15'" - [Submission(id='12l9mzx'), ...]
18:26:18.533 | INFO    | Flow run 'huge-pegasus' - Finished in state Completed('All states completed.')
18:26:18.763 | ERROR   | asyncio - Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x7f7a102656d0>
18:26:18.764 | ERROR   | asyncio - Unclosed connector connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x7f7a1022f9a0>, 6351.362316427)]'] connector: <aiohttp.connector.TCPConnector object at 0x7f7a19c9dc10>
z
Oh those are just error logs not actual errors
f
yeah sorry, I should have been more precise
z
Anyway, looks like you’re probably supposed to context manage something?
Async things can’t auto teardown on shutdown like sync things can
f
I'll take a look, reading the docs of asyncpraw I didn't find much about managing context but maybe i missed something
z
You need to close them explicitly or context manage them
f
yep, you were right, i had missed exactly that
needed to call
await reddit.close()
thanks a lot for the help, i am quite new with
async IO
and am not aware of many concepts