<@ULVA73B9P> I would like a flow to call a child f...
# ask-marvin
v
@Marvin I would like a flow to call a child flow with different sets of parameters. Can I set a concurrency limit at the child flow level ? Let's say my parent flow call the child flow 10 times with 10 different sets of parameters, and I want only 2 concurrent runs of the child flow, and enqueue the other 8 runs. Is this possible ?
m
thought for 56.6 seconds
To set the concurrency limit of 2 for your child flow, you would use:
Copy code
bash
prefect concurrency-limit create child_flow 2
This setup will: 1. Allow only 2 instances of the child flow to run concurrently 2. Automatically queue any additional runs (the other 8 in your case) until a slot becomes available 3. Execute the queued runs as soon as a slot frees up A few important notes: 1. The
submit()
method is crucial here as it allows for concurrent execution. If you used a regular call (
child_flow(params)
), the flows would run sequentially. 2. The concurrency limit is enforced based on the tag ("child_flow" in this example). 3. You can monitor the concurrency limits using:
Copy code
bash
   prefect concurrency-limit ls
4. If you need to modify the limit later, you can delete the existing limit and create a new one:
Copy code
bash
   prefect concurrency-limit delete child_flow
   prefect concurrency-limit create child_flow new_limit
This approach provides exactly what you're looking for: the parent flow will submit all 10 runs, but only 2 will execute concurrently while the others wait in the queue. Would you like me to explain any part of this in more detail or show you how to monitor the concurrent executions?
v
@Marvin Can I do this through the python API ?
m
thought for 95.4 seconds
Yes, you can manage concurrency limits programmatically! Here's how to implement your scenario using Python code:
Copy code
python
from prefect import flow
from prefect.client.orchestration import PrefectClient
from prefect.client.schemas.objects import ConcurrencyLimit
from typing import Dict
import asyncio

# First, let's create a function to set up the concurrency limit
async def setup_concurrency_limit(tag: str, limit: int):
    async with PrefectClient() as client:
        # Delete existing limit if it exists
        try:
            await client.delete_concurrency_limit(tag)
        except:
            pass
        
        # Create new concurrency limit
        await client.create_concurrency_limit(
            tag=tag,
            concurrency_limit=limit
        )

# Your flows remain the same
@flow(tags=["child_flow"])
def child_flow(params: Dict):
    print(f"Processing parameters: {params}")
    
@flow
def parent_flow():
    parameter_sets = [{"set": i} for i in range(10)]
    for params in parameter_sets:
        child_flow.submit(params)

# Set up and run everything
async def main():
    # Set up concurrency limit of 2 for the child flow
    await setup_concurrency_limit("child_flow", 2)
    
    # Run the parent flow
    parent_flow()

# Run everything
if __name__ == "__main__":
    asyncio.run(main())
You can also check the current concurrency limits programmatically:
Copy code
python
async def check_concurrency_limits():
    async with PrefectClient() as client:
        # Get all concurrency limits
        limits = await client.read_concurrency_limits()
        for limit in limits:
            print(f"Tag: {limit.tag}, Limit: {limit.concurrency_limit}")

# To check limits:
await check_concurrency_limits()
If you need to modify the limit later:
Copy code
python
async def update_concurrency_limit(tag: str, new_limit: int):
    async with PrefectClient() as client:
        # Delete existing limit
        await client.delete_concurrency_limit(tag)
        # Create new limit
        await client.create_concurrency_limit(
            tag=tag,
            concurrency_limit=new_limit
        )
A few important notes: 1. This approach uses the
PrefectClient
directly, giving you programmatic control over concurrency limits. 2. The code needs to be run in an async context because the
PrefectClient
methods are asynchronous. 3. The concurrency limit will still be enforced based on the tag ("child_flow" in this example). 4. The parent flow's submissions will still be automatically queued if they exceed the concurrency limit. This gives you the same functionality as the CLI approach but allows you to manage everything through Python code. This can be particularly useful if you want to: - Automate the setup of concurrency limits - Dynamically adjust limits based on conditions in your code - Integrate concurrency management into your application logic Would you like me to explain any part of this in more detail or show you how to monitor the concurrent executions programmatically?
v
@Marvin What about the
run_deployment
function ? Would it not be an option that the parent flow calls
run_deployment
to run the child flow, and rely on the ConcurrencyLimitConfig of the deployment ?
m
thought for 98.3 seconds
You can set the concurrency limit on your work queue using:
Copy code
bash
prefect work-queue set-concurrency-limit "concurrent-queue" 2
Or programmatically:
Copy code
python
from prefect.client.orchestration import PrefectClient

async def setup_work_queue():
    async with PrefectClient() as client:
        # Create work queue if it doesn't exist
        queue = await client.create_work_queue(
            name="concurrent-queue",
            work_pool_name="your-work-pool"
        )
        
        # Set concurrency limit
        await client.update_work_queue(
            queue.id,
            concurrency_limit=2
        )
This approach has several advantages: 1. It's more declarative - the concurrency is part of your deployment configuration 2. It's more maintainable - you can modify concurrency limits through the work queue without changing code 3. It leverages Prefect's built-in deployment infrastructure 4. You can monitor and manage the queue through the Prefect UI The work queue concurrency limit will ensure that only 2 instances of your child flow run concurrently, while the others wait in the queue. The parent flow will still submit all 10 runs, but they'll be executed according to the concurrency limits of the work queue. Would you like me to show you how to monitor the queue status or explain any part of this in more detail?