Federico Zambelli
04/17/2023, 3:21 PMRuntimeError: There is no current event loop in thread 'WorkerThread-0'
. I understood that Prefect runs its own event loop in the main thread, and flows are executed in a separate worker thread.
I can't seem to figure out a work around for the issue.
For the record, my code (tldr) is as such:
@task
def get_submission_from_ids(ids):
reddit = asyncpraw.Reddit(...)
return <http://reddit.info|reddit.info>(fullnames=ids)
@flow
async def get_data():
sub_gen = get_submission_from_ids([...]) # this is an async generator
submissions = [sub async for sub in sub_gen]
if __name__ == '__main__':
asyncio.run(get_data())
get_submission_from_ids
directly to the flow get_data
, the run completes successfully, i get the Submission objects, but then I get this other error at the very end:Zanie
04/17/2023, 3:47 PMFederico Zambelli
04/17/2023, 3:49 PMinfo
method returns an async generator itself, so I don't think it needs to be itself in an async method.
Nevertheless, even if I set get_submission_from_ids
to be async, and then await it, i still get an errorReddit
instance gets created matters.
If I create it in the main flow and then pass it down to the function, it now works, the only error I have left is the one I copy pasted above.Zanie
04/17/2023, 4:03 PMget_submission_from_ids
is async?Federico Zambelli
04/17/2023, 4:04 PMasyncprawcore.exceptions.RequestException: error with request Event loop is closed
Zanie
04/17/2023, 4:18 PMreddit
object outside of the task afterwards?Federico Zambelli
04/17/2023, 4:20 PMsubmissions = [sub async for sub in sub_gen]
But to be fair, after moving the reddit instance to the flow and passing it down to the task, the generator now works, so that fixed it.
I just was left with this error here:Zanie
04/17/2023, 4:23 PMFederico Zambelli
04/17/2023, 4:23 PM18:26:14.386 | INFO | prefect.engine - Created flow run 'huge-pegasus' for flow 'Get subreddit data'
18:26:15.817 | INFO | Flow run "r/dataengineering | from '2023-04-14' to '2023-04-15'" - Loaded result from file: 30bf6315f9485ee82b4e6d2fbe7329be.pkl
18:26:16.042 | INFO | Flow run "r/dataengineering | from '2023-04-14' to '2023-04-15'" - Created task run 'Get submission from ids-0' for task 'Get submission from ids'
18:26:16.043 | INFO | Flow run "r/dataengineering | from '2023-04-14' to '2023-04-15'" - Executing 'Get submission from ids-0' immediately...
18:26:17.636 | INFO | Task run 'Get submission from ids-0' - Finished in state Completed()
18:26:18.267 | INFO | Flow run "r/dataengineering | from '2023-04-14' to '2023-04-15'" - [Submission(id='12l9mzx'), ...]
18:26:18.533 | INFO | Flow run 'huge-pegasus' - Finished in state Completed('All states completed.')
18:26:18.763 | ERROR | asyncio - Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x7f7a102656d0>
18:26:18.764 | ERROR | asyncio - Unclosed connector connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x7f7a1022f9a0>, 6351.362316427)]'] connector: <aiohttp.connector.TCPConnector object at 0x7f7a19c9dc10>
Zanie
04/17/2023, 4:38 PMFederico Zambelli
04/17/2023, 4:38 PMZanie
04/17/2023, 4:38 PMFederico Zambelli
04/17/2023, 4:38 PMZanie
04/17/2023, 4:38 PMFederico Zambelli
04/17/2023, 4:40 PMawait reddit.close()
async IO
and am not aware of many concepts