Jason Wiener
12/22/2023, 11:18 PMMarvin
12/22/2023, 11:19 PMmap
method in Prefect 2.x, you would typically use global concurrency limits and possibly task run concurrency limits, which are configured with tags. Global concurrency limits control how many tasks, flows, or other operations can run simultaneously, which is useful for optimizing resource usage and preventing bottlenecks.
Here are the steps to set up task concurrency limits with an example:
1. Define a Concurrency Limit:
You start by creating a concurrency limit by applying a limit to a certain tag. This tag will be associated with tasks that you wish to limit. You can use the Prefect CLI to create this limit:
$ prefect concurrency-limit create <TAG> <LIMIT>
Where <TAG>
is the tag you want to apply to tasks, and <LIMIT>
is the maximum number of concurrent task runs allowed for that tag.
2. Assign Tags to Tasks:
Assign the tag you created to the tasks that should be limited. This can be done when defining the task or dynamically during the flow run.
Here's an example of a task definition with the tags
parameter:
python
from prefect import task
@task(tags=["example-limit"])
def example_task(x):
# task implementation
3. Map the Task:
When you're using the map
method to run tasks concurrently, Prefect will apply the concurrency limit to the tasks based on the associated tag. Here’s an example of applying the map method:
python
from prefect import flow
@flow
def example_flow():
data = [...]
mapped_results = example_task.map(data)
if __name__ == "__main__":
example_flow()
4. Monitor Concurrency Limits:
You can monitor the concurrency limits you have set using the Prefect CLI or via the Prefect UI. In the CLI, you can use the following commands to inspect and list the concurrency limits:
$ prefect concurrency-limit inspect <TAG>
$ prefect concurrency-limit ls
By following these steps, you ensure that the concurrency of tasks that are set up with the map method does not exceed the limit you've set for the applied tags, thereby controlling the level of concurrent execution according to your system's capabilities and requirements.Jason Wiener
12/22/2023, 11:23 PMMarvin
12/22/2023, 11:24 PMJason Wiener
12/22/2023, 11:27 PMMarvin
12/22/2023, 11:28 PMJason Wiener
12/22/2023, 11:30 PMMarvin
12/22/2023, 11:30 PMpython
from prefect import task, flow
# Assuming you have set a global concurrency limit with the name "example-limit"
# in the Prefect UI, you can reference it in your code by using the same name as a tag.
@task(tags=["example-limit"])
def example_task(x):
# Task implementation that should be limited by concurrency
pass
@flow
def example_flow():
# An iterable of data to be processed by `example_task`
data = [1, 2, 3, 4, 5]
# The task is limited by the global concurrency when it's run with `map`
example_task.map(data)
if __name__ == "__main__":
example_flow()
In this example, you would define a global concurrency limit with the name "example-limit" in the Prefect UI, and then use that name as a tag on the example_task
. The map
method applies the task to each element in data
, and the runs of example_task
are limited globally by the concurrency setting with the "example-limit" tag.
It's important to note that although we use the tag in the task to reference the limit, the actual enforcement of the limit occurs by Prefect's orchestration layer based on the settings configured in the UI or API. This tag simply associates the task with the previously set global concurrency limit.