Paweł Biernat
06/04/2024, 9:51 AMMarvin
06/04/2024, 9:51 AMpython
from prefect import flow, task
from prefect.concurrency.sync import concurrency
@task(tags=["limited-task"])
def my_task(x, y):
with concurrency("limited-task", occupy=1):
return x + y
@flow
def my_flow():
for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
my_task.submit(x, y)
if __name__ == "__main__":
my_flow()
This example uses the concurrency
context manager and sets a concurrency limit on a specific tag.
2. Using Global Concurrency Limits:
- Prefect lets you set global concurrency limits that span multiple tasks and flow runs.
- These limits are configured through work pools and queues and can be adjusted using the Prefect CLI or API.
shell
prefect concurrency-limit create "my_limit" --limit 10
This command sets a global concurrency limit of 10 for tasks tagged with my_limit
.
3. Configuring the Prefect Client:
- You can also set and inspect limits programmatically using the Prefect Client.
python
from prefect import get_client
async def set_concurrency_limit():
async with get_client() as client:
await client.create_concurrency_limit("my_limit", limit=10)
asyncio.run(set_concurrency_limit())
### Documentation and Resources:
- Concurrency Management: Prefect Concurrency Management
- Concurrency Limits Guide: Global Concurrency Limits
For a more detailed explanation, you might want to check these resources or explore the Prefect GitHub issues for any related discussions on the topic.
### Relevant GitHub Issues:
- Concurrency Limit Setting "Active Slots"
- Check concurrency limit before submitting tasks
- Global concurrency not applied to tasks with DaskTaskRunner
These links might offer additional insight or potential solutions if you encounter specific issues related to concurrency limits.