Is there any way to limit concurrency when mapping...
# prefect-community
a
Is there any way to limit concurrency when mapping over a task? I’m wanting to send a bunch of API requests, but the endpoint I’m hitting is pretty fragile. I’m wanting the exact map functionality (feed in a list, and have each execute), but limit the execution rate. Either sequentially or limiting to N concurrent requests would be great. I’ve achieved the same functionality by arbitrarily breaking up the number of requests I’d need to send into chunks, but then those either succeed or fail as a group, which I’m hoping to avoid.
c
This is a great question! 1.) you could implement a state handler on the mapped task which checks an external Queue that you maintain to limit how many can enter a running state 2.) you could use use dask worker resources + prefect tags as described in this API doc: https://docs.prefect.io/api/unreleased/engine/executors.html#daskexecutor 3.) this functionality will be present in Prefect Cloud and will be capable of working across flows
a
2 sounds like the route I’d eventually go down. Would the logic in Prefect Cloud be that you can only have N flows running at once? Or that regardless of how many flows, if you tag a task as something, you limit the concurrency of that? If the second, would I need to change how I wrote the code? Thanks!
c
We are currently implementing a feature for limiting the concurrency of all tasks (across all Flows) with a prescribed tag. The only change to your actual code would be to tag your tasks, e.g.,
Copy code
@task(tags=["database", "google"])
def my_task():
    ...
All concurrency limits for each tag would actually be set through the Cloud UI
a
Hey Chris, circling back around to this, but is any part of this going to be implemented into the open source package? Or is all rate limiting functionality going to be exclusively for the cloud offering?
c
Exclusively in Cloud; the reason being that to do this properly across flows requires a stateful orchestration layer, which is one of the core things Cloud provides. There are plans to eventually have a free tier for Cloud but I couldn’t give you a date for that right now
a
Could the cloud feature be adapted and added to limit concurrency within the same flow to the open source core offering? Or would that also require the stateful layer? Thanks for answering the questions btw!
c
Anytime! Limiting concurrency within the same flow requires a little work but is currently possible: - option 1.) set up one or more shared Queues populated with “tickets” for how many tasks can run simultaneously; using a task state handler on all the relevant tasks, have them check the queue before entering a running state, and otherwise wait until a ticket is available - option 2.) set up a dask cluster where each worker has a certain amount of a “dask resource” and use prefect tags to determine how much of each resource the task uses; in this case you’d have a fixed number of “concurrency slots” (https://docs.prefect.io/api/unreleased/engine/executors.html#daskexecutor for a little more info on this)
@Marvin archive “How to limit concurrency of task runs?”
m
146 Views