Hi all — Just curious what the best practice is fo...
# ask-community
k
Hi all — Just curious what the best practice is for creating concurrent tasks within a flow is. We are testing out a flow that we have set a global Task concurrency limit to 4, and then from a particular flow we’re using the
.submit
method to launch many tasks concurrently (after looping through a list). This worked fine for smaller flows, but for “longer” flows (i.e., where the number of tasks submitted is less than the API task rate limit of 400). Curious to know if our current design of looping through a list and submitting tasks is not optimal. For reference we’re converting a pure python ETL script to Prefect where the original python script utilized threading (and not necessarily asyncio)
k
what's the exact problem you're running into? Getting rate limited on task submission?
k
Yes - a 429 response, so I think it’s a Prefect Cloud issue now that I think about it…
k
ahh so you're just trying to submit lots of tasks up front
k
Yeah well I’m not trying to, if that makes sense. Is there a better method to submit tasks, like 4 at a time instead of 400 at a time?
We’re just naively using the submit method, and looping through a list of S3 keys
k
I'd check out
rate_limit
here
this example specifically
k
Thanks! I’ll check it out, not sure how we missed that page in the docs
k
don't worry we know stuff is hard to find sometimes! never hesitate to ask because at least half the docs is burned into my mind now
j
To de-risk a similar project, I processed my data in chunks - I did separate flows for 3 months of data at a time, and within each flow submitted my tasks in batches of about 20-30 at a time and updated a status table on each batch as the batch finished. I didn't want to have to redo more tasks than necessary in event of a failure. I think the batch set sizes could have been bigger (to use my infra more efficiently) but generally no regrets over breaking it down to easier-to-troubleshoot-sized chunks. I just used a tag on each task rather than using the rate_limit api -
Copy code
@task(tags=[MY_PROJECT_CONCURRENCY_TAG])
def do_some_stuff(...)
    ...
👀 1
You still have to set the concurrency limit on the tag, but there wasn't any extra code in the task body itself
k
Thanks for the info, I’m going to try all suggestions