Samuel Hinton
01/02/2025, 2:42 AMdef task_wrapper(semaphore):
async def wrapper(fn):
async def inner(*args, **kwargs):
async with sempahore:
await fn(*args, **kwargs)
return inner
return wrapper
limit = asyncio.Semaphore(4)
@task_wrapper(limit)
@task()
async def some_task(i):
print(i)
@flow
async def some_flow():
for i in range(100):
some_task(i)
Is there a nicer way solely using prefect (so it works nicely with synchronous .submit()), something akin to:
@task(concurrency_limit=4)
async def some_task(i):
print(i)
@flow
async def some_flow():
for i in range(100):
some_task(i)
Marvin
01/02/2025, 2:43 AMpython
from prefect import task, flow, tags
@task
def some_task(i):
print(i)
@flow
def some_flow():
with tags("limited"):
for i in range(100):
some_task.submit(i)
some_flow()
To set a concurrency limit using the CLI:
sh
prefect concurrency-limit create --tag limited --limit 4
This will ensure that no more than 4 tasks with the "limited" tag run concurrently.
For more details, refer to the Prefect documentation on task concurrency limits and global concurrency limits.Marvin
01/02/2025, 2:43 AM"limited"
.
2. Setting Concurrency Limits: Use the Prefect CLI or Prefect API to set a concurrency limit on this tag. This can be done as follows:
bash
prefect concurrency-limit create limited 4
3. Flow and Task Definition: Use the tag in your task definition.
python
from prefect import flow, task
@task(tags=["limited"])
async def some_task(i):
print(i)
@flow
async def some_flow():
for i in range(100):
some_task.submit(i)
This setup ensures that no more than 4 instances of some_task
run concurrently across all flow runs that include this task. For more details, you can refer to the Prefect 3.x documentation on task run limits and global concurrency limits.
This approach is preferable because it leverages Prefect's infrastructure, allowing for seamless integration with Prefect's orchestration features, including synchronous task submission with .submit()
.Samuel Hinton
01/02/2025, 2:49 AMMarvin
01/02/2025, 2:49 AMpython
from prefect import flow, task
from prefect.client import get_client
@task
def some_task(i):
print(i)
@flow
async def some_flow():
async with get_client() as client:
# Create or update a concurrency limit
await client.create_concurrency_limit(name="limited", limit=4)
for i in range(100):
some_task.submit(i)
some_flow()
This code snippet demonstrates how to use the PrefectClient
to set a concurrency limit dynamically within your flow. You can replace "limited"
and 4
with your desired tag and limit values.
For more on using the Prefect API, refer to the Prefect 3.x documentation on task run limits. This approach gives you the flexibility to adjust concurrency limits programmatically, aligning with your flow's logic or external conditions.Nate
01/02/2025, 4:40 PMNate
01/02/2025, 4:40 PMMarvin
01/02/2025, 4:40 PMpython
from prefect import flow, task
from prefect.concurrency.asyncio import concurrency
@task
async def some_task(i):
print(f"Processing {i}")
@flow
async def some_flow():
# Use the concurrency context manager to limit to 4 concurrent tasks
async with concurrency("limited", 4):
for i in range(100):
some_task.submit(i)
some_flow()
### Key Points:
- Concurrency Context Manager: The concurrency
context manager is used to define a concurrency limit for a block of code. In this example, it limits the number of concurrent some_task
executions to 4.
- Dynamic Control: This method allows you to set concurrency limits directly within your flow, without needing external CLI commands, making it more dynamic and adaptable to different scenarios.
For more details, you can refer to the Prefect documentation on global concurrency limits. This approach ensures that you maintain control over task execution directly within your flow logic.Samuel Hinton
01/02/2025, 10:16 PM