<@ULVA73B9P> What is the best way to limit the num...
# marvin-in-the-wild
l
@Marvin What is the best way to limit the number of jobs concurrently submitted in prefect? I am hitting compute limits in AWS Glue.
m
You can limit the number of concurrent jobs in Prefect by using task concurrency limits. Here's a concise guide based on the gathered documentation: * _Task Concurrency Limits using Tags_: - You can set a maximum number of concurrent task runs using tags. - Each tag can have an explicit concurrency limit. - When a task has multiple tags, it will only run if all tags have available concurrency. - Tags without explicit limits have unlimited concurrency. - Setting the concurrency limit to 0 for a tag will abort any task with that tag. References: - Guide on Task Concurrency Limits: here This should help you manage AWS Glue compute limits effectively by limiting the tasks running concurrently.
l
@Marvin how do I create a tag and set the concurrency limit?
m
To create a tag and set a concurrency limit in Prefect, you need to follow these steps: 1. Define the tag. 2. Set the concurrency limit. 3. Apply the tag to tasks. Here is an example:
Copy code
python
from prefect import flow, task
from prefect.utilities.collections import add_tags
from prefect.tag_concurrency import TagConcurrencyLimit

# Define task tag and set concurrency limit
limit = TagConcurrencyLimit(tag="limited-task", limit=3)

@task(tags=["limited-task"])
def limited_task(x):
    # Some computation here
    return x * 2

@flow
def my_concurrent_flow():
    results = []
    for i in range(10):
        results.append(limited_task.submit(i))
    return results

if __name__ == "__main__":
    my_concurrent_flow()
* Explanation: - The
TagConcurrencyLimit
is defined with the tag
"limited-task"
and a concurrency limit of
3
. - The
limited_task
is decorated with this tag. - In the flow,
my_concurrent_flow
, the
limited_task
is invoked multiple times, but only 3 will run concurrently. Reference: Task Concurrency Limits Documentation This way, you can control the number of AWS Glue jobs running concurrently and avoid hitting compute limits.
l
@Marvin What if my task should have different concurrency depending on the flow it is used in?
m
To dynamically set different concurrency limits for the same task in different flows, you should follow these steps: 1. Define the tag and set concurrency limits inside the flow function. 2. Use the
TagConcurrencyLimit
utility from Prefect inside the flow
. Here is a formatted example:
Copy code
python
from prefect import flow, task
from prefect.tag_concurrency import TagConcurrencyLimit

@task
def limited_task(x):
    # Some computation here
    return x * 2

@flow
def my_concurrent_flow_a():
    # Set tag and concurrency limit for this specific flow
    limit = TagConcurrencyLimit(tag="limited-task", limit=3)
    limited_task.add_tag("limited-task")
    
    results = []
    for i in range(10):
        results.append(limited_task.submit(i))
    return results

@flow
def my_concurrent_flow_b():
    # Set a different tag limit for another flow
    limit = TagConcurrencyLimit(tag="limited-task", limit=5)
    limited_task.add_tag("limited-task")
    
    results = []
    for i in range(10):
        results.append(limited_task.submit(i))
    return results

if __name__ == "__main__":
    my_concurrent_flow_a()
    my_concurrent_flow_b()
* Explanation: - The tag
limited-task
and its concurrency limit are defined within each flow function (
my_concurrent_flow_a
and
my_concurrent_flow_b
). - Each flow function sets its concurrency limit accordingly. - The
limited_task
is tagged within the flow before being submitted. References: - Task Concurrency Limits Documentation - Relevant issues: No specific documentation or issues directly describing this exact setup, but the concept is inferred from general concurrency and tagging documentation. This approach allows the same task to have varying concurrency limits in different flows.