Guilherme Lins
08/06/2024, 7:37 PMMarvin
08/06/2024, 7:37 PMpython
from prefect import task, flow
@task(tags=["database_task"])
def my_task(x):
return x
@flow
def my_flow():
my_list = list(range(1000))
results = my_task.map(my_list)
return results
if __name__ == "__main__":
# Set the task concurrency limit for 'database_task' tag
import prefect
prefect.context.concurrency_limits = {"database_task": 10}
my_flow()
Example of using Global Concurrency Limits:
bash
# Create a global concurrency limit
prefect gcl create my-concurrency-limit --limit 10
# Use the concurrency limit in a Prefect flow
from prefect import concurrency
@flow
def my_flow():
with concurrency("my-concurrency-limit"):
my_list = list(range(1000))
results = my_task.map(my_list)
return results
For more detailed information, check out the Prefect documentation:
Task Concurrency Limits
Global Concurrency Limits
Additionally, you can monitor relevant issues and discussions on GitHub regarding task concurrency and mapping improvements:
GitHub Issues on Limiting Concurrent Mapped Tasks
Hope this clears up how to manage concurrency limits in Prefect!