Mohamed Haggag
12/05/2024, 2:43 PMimport aiohttp
import asyncio
import time
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
url = "<http://127.0.0.1:5000/simulate>"
tasks = []
async with aiohttp.ClientSession() as session:
for _ in range(1000): # Adjust number of requests
tasks.append(fetch(session, url))
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
• This code will run fetch
concurrently 1000 times, all in a single thread, asynchronously.
• After I explored Prefect .submit() /.map()
, it became obvious that Prefect concurrency model is multithreading, which means adding a huge overhead launching and switching between threads in I/O-bound use cases like the one above.
My question:
• How can I convert fetch
to a @task
and main
to @flow
(to benefit from retrying and tracking infrastructure) and at the same time executing the task async in a single thread?
• I understand it is possible to use Dask or Ray as taks executers, and it seems these libraries independently support async execution in a a single thread (much like what you expect in vanilla python), so how can I set them up with Prefect to work like that?Mohamed Haggag
12/05/2024, 3:35 PMNate
12/05/2024, 3:39 PMMohamed Haggag
12/05/2024, 6:21 PM