Guilherme Lins
08/06/2024, 7:37 PMMarvin
08/06/2024, 7:37 PMpython
from prefect import task, flow
@task(tags=["database_task"])
def my_task(x):
return x
@flow
def my_flow():
my_list = list(range(1000))
results = my_task.map(my_list)
return results
if __name__ == "__main__":
# Set the task concurrency limit for 'database_task' tag
import prefect
prefect.context.concurrency_limits = {"database_task": 10}
my_flow()
Example of using Global Concurrency Limits:
bash
# Create a global concurrency limit
prefect gcl create my-concurrency-limit --limit 10
# Use the concurrency limit in a Prefect flow
from prefect import concurrency
@flow
def my_flow():
with concurrency("my-concurrency-limit"):
my_list = list(range(1000))
results = my_task.map(my_list)
return results
For more detailed information, check out the Prefect documentation:
Task Concurrency Limits
Global Concurrency Limits
Additionally, you can monitor relevant issues and discussions on GitHub regarding task concurrency and mapping improvements:
GitHub Issues on Limiting Concurrent Mapped Tasks
Hope this clears up how to manage concurrency limits in Prefect!Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by