Vincent Chéry
07/22/2025, 2:19 PMMarvin
07/22/2025, 2:20 PMMarvin
07/22/2025, 2:20 PMbash
prefect concurrency-limit create child_flow 2
This setup will:
1. Allow only 2 instances of the child flow to run concurrently
2. Automatically queue any additional runs (the other 8 in your case) until a slot becomes available
3. Execute the queued runs as soon as a slot frees up
A few important notes:
1. The submit()
method is crucial here as it allows for concurrent execution. If you used a regular call (child_flow(params)
), the flows would run sequentially.
2. The concurrency limit is enforced based on the tag ("child_flow" in this example).
3. You can monitor the concurrency limits using:
bash
prefect concurrency-limit ls
4. If you need to modify the limit later, you can delete the existing limit and create a new one:
bash
prefect concurrency-limit delete child_flow
prefect concurrency-limit create child_flow new_limit
This approach provides exactly what you're looking for: the parent flow will submit all 10 runs, but only 2 will execute concurrently while the others wait in the queue.
Would you like me to explain any part of this in more detail or show you how to monitor the concurrent executions?Vincent Chéry
07/22/2025, 2:22 PMMarvin
07/22/2025, 2:22 PMMarvin
07/22/2025, 2:23 PMpython
from prefect import flow
from prefect.client.orchestration import PrefectClient
from prefect.client.schemas.objects import ConcurrencyLimit
from typing import Dict
import asyncio
# First, let's create a function to set up the concurrency limit
async def setup_concurrency_limit(tag: str, limit: int):
async with PrefectClient() as client:
# Delete existing limit if it exists
try:
await client.delete_concurrency_limit(tag)
except:
pass
# Create new concurrency limit
await client.create_concurrency_limit(
tag=tag,
concurrency_limit=limit
)
# Your flows remain the same
@flow(tags=["child_flow"])
def child_flow(params: Dict):
print(f"Processing parameters: {params}")
@flow
def parent_flow():
parameter_sets = [{"set": i} for i in range(10)]
for params in parameter_sets:
child_flow.submit(params)
# Set up and run everything
async def main():
# Set up concurrency limit of 2 for the child flow
await setup_concurrency_limit("child_flow", 2)
# Run the parent flow
parent_flow()
# Run everything
if __name__ == "__main__":
asyncio.run(main())
You can also check the current concurrency limits programmatically:
python
async def check_concurrency_limits():
async with PrefectClient() as client:
# Get all concurrency limits
limits = await client.read_concurrency_limits()
for limit in limits:
print(f"Tag: {limit.tag}, Limit: {limit.concurrency_limit}")
# To check limits:
await check_concurrency_limits()
If you need to modify the limit later:
python
async def update_concurrency_limit(tag: str, new_limit: int):
async with PrefectClient() as client:
# Delete existing limit
await client.delete_concurrency_limit(tag)
# Create new limit
await client.create_concurrency_limit(
tag=tag,
concurrency_limit=new_limit
)
A few important notes:
1. This approach uses the PrefectClient
directly, giving you programmatic control over concurrency limits.
2. The code needs to be run in an async context because the PrefectClient
methods are asynchronous.
3. The concurrency limit will still be enforced based on the tag ("child_flow" in this example).
4. The parent flow's submissions will still be automatically queued if they exceed the concurrency limit.
This gives you the same functionality as the CLI approach but allows you to manage everything through Python code. This can be particularly useful if you want to:
- Automate the setup of concurrency limits
- Dynamically adjust limits based on conditions in your code
- Integrate concurrency management into your application logic
Would you like me to explain any part of this in more detail or show you how to monitor the concurrent executions programmatically?Vincent Chéry
07/22/2025, 2:37 PMrun_deployment
function ? Would it not be an option that the parent flow calls run_deployment
to run the child flow, and rely on the ConcurrencyLimitConfig of the deployment ?Marvin
07/22/2025, 2:37 PMMarvin
07/22/2025, 2:39 PMbash
prefect work-queue set-concurrency-limit "concurrent-queue" 2
Or programmatically:
python
from prefect.client.orchestration import PrefectClient
async def setup_work_queue():
async with PrefectClient() as client:
# Create work queue if it doesn't exist
queue = await client.create_work_queue(
name="concurrent-queue",
work_pool_name="your-work-pool"
)
# Set concurrency limit
await client.update_work_queue(
queue.id,
concurrency_limit=2
)
This approach has several advantages:
1. It's more declarative - the concurrency is part of your deployment configuration
2. It's more maintainable - you can modify concurrency limits through the work queue without changing code
3. It leverages Prefect's built-in deployment infrastructure
4. You can monitor and manage the queue through the Prefect UI
The work queue concurrency limit will ensure that only 2 instances of your child flow run concurrently, while the others wait in the queue. The parent flow will still submit all 10 runs, but they'll be executed according to the concurrency limits of the work queue.
Would you like me to show you how to monitor the queue status or explain any part of this in more detail?