Paweł Biernat
06/04/2024, 9:51 AMMarvin
06/04/2024, 9:51 AMpython
from prefect import flow, task
from prefect.concurrency.sync import concurrency
@task(tags=["limited-task"])
def my_task(x, y):
with concurrency("limited-task", occupy=1):
return x + y
@flow
def my_flow():
for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
my_task.submit(x, y)
if __name__ == "__main__":
my_flow()
This example uses the concurrency
context manager and sets a concurrency limit on a specific tag.
2. Using Global Concurrency Limits:
- Prefect lets you set global concurrency limits that span multiple tasks and flow runs.
- These limits are configured through work pools and queues and can be adjusted using the Prefect CLI or API.
shell
prefect concurrency-limit create "my_limit" --limit 10
This command sets a global concurrency limit of 10 for tasks tagged with my_limit
.
3. Configuring the Prefect Client:
- You can also set and inspect limits programmatically using the Prefect Client.
python
from prefect import get_client
async def set_concurrency_limit():
async with get_client() as client:
await client.create_concurrency_limit("my_limit", limit=10)
asyncio.run(set_concurrency_limit())
### Documentation and Resources:
- Concurrency Management: Prefect Concurrency Management
- Concurrency Limits Guide: Global Concurrency Limits
For a more detailed explanation, you might want to check these resources or explore the Prefect GitHub issues for any related discussions on the topic.
### Relevant GitHub Issues:
- Concurrency Limit Setting "Active Slots"
- Check concurrency limit before submitting tasks
- Global concurrency not applied to tasks with DaskTaskRunner
These links might offer additional insight or potential solutions if you encounter specific issues related to concurrency limits.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by