Lachlan Casey
08/21/2024, 3:53 AMMarvin
08/21/2024, 3:53 AMLachlan Casey
08/21/2024, 5:41 AMMarvin
08/21/2024, 5:42 AMpython
from prefect import flow, task
from prefect.utilities.collections import add_tags
from prefect.tag_concurrency import TagConcurrencyLimit
# Define task tag and set concurrency limit
limit = TagConcurrencyLimit(tag="limited-task", limit=3)
@task(tags=["limited-task"])
def limited_task(x):
# Some computation here
return x * 2
@flow
def my_concurrent_flow():
results = []
for i in range(10):
results.append(limited_task.submit(i))
return results
if __name__ == "__main__":
my_concurrent_flow()
* Explanation:
- The TagConcurrencyLimit
is defined with the tag "limited-task"
and a concurrency limit of 3
.
- The limited_task
is decorated with this tag.
- In the flow, my_concurrent_flow
, the limited_task
is invoked multiple times, but only 3 will run concurrently.
Reference: Task Concurrency Limits Documentation
This way, you can control the number of AWS Glue jobs running concurrently and avoid hitting compute limits.Lachlan Casey
08/21/2024, 5:45 AMMarvin
08/21/2024, 5:45 AMTagConcurrencyLimit
utility from Prefect inside the flow.
Here is a formatted example:
python
from prefect import flow, task
from prefect.tag_concurrency import TagConcurrencyLimit
@task
def limited_task(x):
# Some computation here
return x * 2
@flow
def my_concurrent_flow_a():
# Set tag and concurrency limit for this specific flow
limit = TagConcurrencyLimit(tag="limited-task", limit=3)
limited_task.add_tag("limited-task")
results = []
for i in range(10):
results.append(limited_task.submit(i))
return results
@flow
def my_concurrent_flow_b():
# Set a different tag limit for another flow
limit = TagConcurrencyLimit(tag="limited-task", limit=5)
limited_task.add_tag("limited-task")
results = []
for i in range(10):
results.append(limited_task.submit(i))
return results
if __name__ == "__main__":
my_concurrent_flow_a()
my_concurrent_flow_b()
* Explanation:
- The tag limited-task
and its concurrency limit are defined within each flow function (my_concurrent_flow_a
and my_concurrent_flow_b
).
- Each flow function sets its concurrency limit accordingly.
- The limited_task
is tagged within the flow before being submitted.
References:
- Task Concurrency Limits Documentation
- Relevant issues: No specific documentation or issues directly describing this exact setup, but the concept is inferred from general concurrency and tagging documentation.
This approach allows the same task to have varying concurrency limits in different flows.