Hi, I've a script which is reading data, dividing ...
# prefect-getting-started
c
Hi, I've a script which is reading data, dividing it up and writing it across multiple tasks via a map. about 300 write tasks in total, with a concurrency of 50. I'm seeing that each task is lasting about 20-30 seconds each, despite timestamps in each task, which is doing nothing more than a single HTTP POST of less than 1mb, taking a far more reasonable 0.5s or so. What is happening with these tasks for the 95% of their apparent execution time other than blocking the concurrency slots? Even removing the ENTIRE contents of the task, they still take the same time... what's going on? These tasks are, as above, created by a map() on 2.20.2. could the volume of tasks created be a key point?
n
hi @Chris Phillips - can you show how you're trying to enforce concurrency here?
What is happening with these tasks for the 95% of their apparent execution time other than blocking the concurrency slots?
it feels likely to me that something is wrong with the concurrency configuration here
c
well In general I'm just setting the ConcurrencyTaskrunner.
What I've noticed now though, which I feel is the core issue, is that if I map or gather a large number of tasks, the tasks take longer to run.
If I run this test code:
Copy code
1 import asyncio
  2 import random
  3 import time
  4
  5 from prefect import flow, task, get_run_logger
  6
  7 async def async_func():
  8     await asyncio.sleep(5)
  9     return 1
 10
 11 @task(tags=["limit_10"])
 12 async def my_task(n):
 13     result = await async_func()
 14     return result
 15
 16 @flow()
 17 async def my_flow():
 18     tasks = []
 19     for n in range(10):
 20         t = my_task(n)
 21         tasks.append(t)
 22
 23     await asyncio.gather(*tasks, return_exceptions=True)
 24
 25 if __name__ == "__main__":
 26     asyncio.run(my_flow())
so that my_task can, via tags only run 10 at a time, then when it's range(10) each task takes 6s, 5s + 1s for scheduling or whatever. when I change line 19 to range(100) then the first 10 tasks take around 15 seconds, and when it's range(250) the first 10 tasks take 37s. By the time we're down to the last 10 of the larger groups, they're back to completing in 6 seconds. As per the previous picture, the behaviour is visually very clear on the range(250) graph
image.png
So the more tasks you create, the longer they will each take?
Ultimately I can appreciate a best practise take where 250 tasks is poor design, however I've not seen anything to that effect. I can change my design strategy accordingly, but in my actual use case I've a pile of sub 1mb files I need to upload, and one task each felt initially like the most natural way to do that due to task concurrency implying I can manage and visualise a rate limit for their upload trivially outside of code. So I could make a fixed number of "uploader" tasks and split the files between them, and give them each queue of 100 to churn through...
n
> Ultimately I can appreciate a best practise take where 250 tasks is poor design this isnt a take I want to give! we have done a ton of work in 3.x to make running tasks more lightweight > So the more tasks you create, the longer they will each take? in 2.x we hold onto task results/states in memory, so we have linear memory growth the more tasks you run, which we fixed in 3.x. I'm not sure if that would directly correspond to tasks "taking longer" necessarily i can still look into the behavior you're seeing in 2.20.2, but are you tied to 2.x at all?
in 3.x
n = 250
first is without a concurrency limit set, second is limit_10 set to 10
c
Nope, no tied in the slighest. very new to it, but I onlt see 3.x rc images for docker so thought it best to let it get a proper release first
That 2nd graph doesn't look too happy to me still...? I'd expect a lot of 5/6s blocks? Looks like it's not limiting the concurrency at all?
n
so i think that has to do with how the example was written + the fact you're using tags for concurrency its creating the task run in the API before the client side code notices the slots are filled you might want something like this?
Copy code
import asyncio

from prefect import flow, task
from prefect.concurrency.asyncio import concurrency
from prefect.futures import wait


async def async_func():
    await asyncio.sleep(5)
    return 1


@task
async def my_task(n):
    return await async_func()


@flow
async def my_flow(num_tasks: int = 10):
    futures = []
    for n in range(num_tasks):
        async with concurrency("limit_10", occupy=10):
            futures.append(my_task.submit(n))

    wait(futures)


if __name__ == "__main__":
    asyncio.run(my_flow(250))  # type: ignore
ie global concurrent limit via
concurrency
ctx manager vs tags we've kept tag concurrency around but GCLs have some advantages
thought it best to let it get a proper release first
makes a lot of sense! GA will be in the next couple weeks, but appreciate any tires you feel like kicking in the meantime
c
I've just gone up to 3rc19-p3.12 and not seeing that nice behaviour at all. the concurrency limit seems to keep being recreated constantly maybe? And apparently disabled and set to 1?
🤔 1
Yeah, you're running concurrency() 250 times, right?
n
the image was from the exact code I shared above yeah the only other thing i did was go into the UI and set the GCL for
limit_10
to 10
c
oh i'm getting a lot of sqlite3 db locked errors, that probably explains it? I'll go to bed and run this in a better test env with postgres tomorrow, thanks!
n
ohhh! jeez yeah i was against cloud. yeah postgres should not have this problem the sqlite db struggles with heavy concurrency sounds good! no problem
c
Nope still didn't work nicely on an aws node with postgres. Instead I did tweak my real code to create 10 smaller list of data out of a large list and feed each smaller list to a task to work through. Creating only 10 tasks made it vastly more performant. And that was without any asyncio / async code whatsoever.
mo tasks, mo problems.
💯 1
n
lol yeah if they're heavy tasks I could see that
there is also (especially in 2.x) non-zero memory/IO overhead associated w the orchestration of tasks
c
Well they're not in the grand schemes. I've been working on writing code out to a database here. But I also have a query side to obtain the data. That query side is now annoying me, I can see the actual function code not executing for upto 15 seconds after the 50 query tasks are map()ed
this is on 3rc19 now, but yes, had a few OOM's today annoyingly. I've made my code write the data to disk asap and then read back later to avoid using up a gig of memory in lists.
Hi, I've continued to work on this and essentially I seem to be using fewer and fewer tasks. What was up to 200 tasks, each with a clear, atomic function, has been crunched down to 5 "query" tasks and 5 "write" tasks, joined with a Queue(). Even overlapping the two sets of tasks seemed to notably reduce the efficiency, so I'm just waiting for all the query tasks to end before then writing all the Queue data... a 3 minute flow is now just 10 seconds, but could be (irrelevantly) faster if I could use the Queues simultaneously. That said, none of this is async outside of using ConcurrentTaskRunner, which may well be why even overlapping the tasks is notably slower.
n
hey @Chris Phillips - do you have a representative / runnable example you could share? definitely interested in enabling ergonomic & performant enough patterns, so seeing what you're doing would be helpful
c
Here's my current code. Don't laugh too much.
A typical run visually is looking like this...