Chris Phillips
08/23/2024, 11:33 AMNate
08/25/2024, 8:47 PMWhat is happening with these tasks for the 95% of their apparent execution time other than blocking the concurrency slots?it feels likely to me that something is wrong with the concurrency configuration here
Chris Phillips
08/26/2024, 7:49 PMChris Phillips
08/26/2024, 7:50 PMChris Phillips
08/26/2024, 7:57 PM1 import asyncio
2 import random
3 import time
4
5 from prefect import flow, task, get_run_logger
6
7 async def async_func():
8 await asyncio.sleep(5)
9 return 1
10
11 @task(tags=["limit_10"])
12 async def my_task(n):
13 result = await async_func()
14 return result
15
16 @flow()
17 async def my_flow():
18 tasks = []
19 for n in range(10):
20 t = my_task(n)
21 tasks.append(t)
22
23 await asyncio.gather(*tasks, return_exceptions=True)
24
25 if __name__ == "__main__":
26 asyncio.run(my_flow())
so that my_task can, via tags only run 10 at a time, then when it's range(10) each task takes 6s, 5s + 1s for scheduling or whatever. when I change line 19 to range(100) then the first 10 tasks take around 15 seconds, and when it's range(250) the first 10 tasks take 37s.
By the time we're down to the last 10 of the larger groups, they're back to completing in 6 seconds.
As per the previous picture, the behaviour is visually very clear on the range(250) graphChris Phillips
08/26/2024, 8:05 PMChris Phillips
08/26/2024, 8:06 PMChris Phillips
08/26/2024, 8:07 PMNate
08/26/2024, 8:10 PMNate
08/26/2024, 8:19 PMn = 250
first is without a concurrency limit set, second is limit_10 set to 10Chris Phillips
08/26/2024, 8:20 PMChris Phillips
08/26/2024, 8:22 PMNate
08/26/2024, 8:30 PMimport asyncio
from prefect import flow, task
from prefect.concurrency.asyncio import concurrency
from prefect.futures import wait
async def async_func():
await asyncio.sleep(5)
return 1
@task
async def my_task(n):
return await async_func()
@flow
async def my_flow(num_tasks: int = 10):
futures = []
for n in range(num_tasks):
async with concurrency("limit_10", occupy=10):
futures.append(my_task.submit(n))
wait(futures)
if __name__ == "__main__":
asyncio.run(my_flow(250)) # type: ignore
Nate
08/26/2024, 8:30 PMconcurrency
ctx manager vs tags
we've kept tag concurrency around but GCLs have some advantagesNate
08/26/2024, 8:40 PMthought it best to let it get a proper release firstmakes a lot of sense! GA will be in the next couple weeks, but appreciate any tires you feel like kicking in the meantime
Chris Phillips
08/26/2024, 8:50 PMChris Phillips
08/26/2024, 8:51 PMNate
08/26/2024, 8:52 PMlimit_10
to 10Chris Phillips
08/26/2024, 8:55 PMNate
08/26/2024, 8:56 PMChris Phillips
08/27/2024, 6:18 PMChris Phillips
08/27/2024, 6:19 PMNate
08/27/2024, 6:19 PMNate
08/27/2024, 6:20 PMChris Phillips
08/27/2024, 6:21 PMChris Phillips
08/27/2024, 6:22 PMChris Phillips
09/03/2024, 2:26 PMNate
09/03/2024, 2:30 PMChris Phillips
09/03/2024, 2:37 PMChris Phillips
09/03/2024, 2:39 PM