<@ULVA73B9P> how can i limit the concurrency of a ...
# ask-marvin
r
@Marvin how can i limit the concurrency of a subflow? like i want the parent flow to only launch 3 of the subflow at a time globally
m
To limit the concurrency of a subflow in Prefect 3.x, you can use concurrency limits at the deployment level through work pools or work queues. This allows you to control how many instances of a flow or subflow can run simultaneously. Here's a concise way to achieve this: 1. Set Up Concurrency Limits: - Use the Prefect CLI or API to create a concurrency limit for your subflow. This involves specifying a tag and a limit. 2. Configure Work Pools or Queues: - Define work pools or queues with specific concurrency limits. This can be done through the Prefect UI or Prefect Cloud, depending on your setup. 3. Apply Tags: - Ensure that your subflow or tasks are tagged appropriately so that the concurrency limits apply. For more detailed instructions, you can refer to the Prefect documentation on task run limits and global concurrency limits. If you need to adjust the priority of flow runs, you can also configure queues with different priorities, as explained in the work pools documentation.
r
@Marvin
Copy code
@flow(task_runner=ThreadPoolTaskRunner(max_workers=10))
async def model_preprocessing_flow(models: dict[str, dict[str, str]]):
    await create_and_wait_for_pvc()
    await create_and_wait_for_pvc_access_pod()
    ingestion_coros = []
    for model_id, model in models.items():
        ingestion_coros.append(ingest_model(model_id, model))
    await asyncio.gather(*ingestion_coros)

@flow
async def ingest_model(model_id, model: dict):
    async with concurrency(MODEL_REGISTRATION_CONCURRENCY_LIMIT_NAME, 1):
Fix the above code so that all the subflows are not created at once and then throttled; use the concurrency in teh parent flow to limit the creation of child flows
m
To limit the creation of subflows in the parent flow itself, you can use the
ThreadPoolTaskRunner
to control the number of concurrent tasks being executed. However, to ensure that subflows are not all created at once, you need to manage the coroutine execution within the parent flow. Here's a revised version of your code using the
ThreadPoolTaskRunner
to limit concurrency:
Copy code
python
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
import asyncio

@flow(task_runner=ThreadPoolTaskRunner(max_workers=3))
async def model_preprocessing_flow(models: dict[str, dict[str, str]]):
    await create_and_wait_for_pvc()
    await create_and_wait_for_pvc_access_pod()
    for model_id, model in models.items():
        await ingest_model(model_id, model)

@flow
async def ingest_model(model_id, model: dict):
    async with concurrency(MODEL_REGISTRATION_CONCURRENCY_LIMIT_NAME, 1):
        # Your ingestion logic here
### Key Changes: - Task Runner Configuration: Set
max_workers
to 3 in
ThreadPoolTaskRunner
to limit the number of concurrent subflows. - Sequential Execution: Use a loop to sequentially await each
ingest_model
call within the
model_preprocessing_flow
. This prevents all subflows from being created at once. This setup ensures that the parent flow controls the number of subflows being executed concurrently. For more details, you can refer to the Prefect documentation on task runners.
r
@Marvin but you’re awaiting each subflow, so they execute sequentially not in parallel right? why cant you use the global concurrency limit in the parent?
m
You're correct that using
await
directly in a loop will execute the subflows sequentially, not in parallel. To achieve parallel execution with a limit on concurrency, you can indeed use global concurrency limits or manage the coroutines differently. Here's how you can adjust your code to use global concurrency control more effectively: 1. Global Concurrency Limit: Use Prefect's global concurrency limits to restrict the number of subflows running concurrently. 2. Concurrency Management: Use
asyncio.Semaphore
to manage concurrency within the parent flow. Here's how you can modify the code:
Copy code
python
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
import asyncio

@flow(task_runner=ThreadPoolTaskRunner(max_workers=10))
async def model_preprocessing_flow(models: dict[str, dict[str, str]]):
    await create_and_wait_for_pvc()
    await create_and_wait_for_pvc_access_pod()
    
    semaphore = asyncio.Semaphore(3)  # Limit to 3 concurrent subflows

    async def limited_ingest_model(model_id, model):
        async with semaphore:
            await ingest_model(model_id, model)

    ingestion_coros = [
        limited_ingest_model(model_id, model) for model_id, model in models.items()
    ]
    await asyncio.gather(*ingestion_coros)

@flow
async def ingest_model(model_id, model: dict):
    async with concurrency(MODEL_REGISTRATION_CONCURRENCY_LIMIT_NAME, 1):
        # Your ingestion logic here
### Key Changes: - Semaphore for Concurrency: Use
asyncio.Semaphore
to limit the number of concurrent subflows to 3. - Parallel Execution: Use
asyncio.gather
to execute subflows in parallel within the limits set by the semaphore. This approach allows the parent flow to handle subflow concurrency more effectively while adhering to the global concurrency constraints. For more details on concurrency limits, refer to Prefect's documentation on global concurrency limits.
r
@Marvin only use the global concurrency limit. is this not possible with prefect?
m
Prefect's global concurrency limits are indeed designed to control the number of concurrent tasks or flows running across the entire environment, including subflows. However, setting these limits requires configuration through tags, work pools, or queues at the deployment level rather than directly within the flow code. Here's how you can set up global concurrency limits for your subflows: 1. Tag Your Subflows: Assign a specific tag to your subflows that you want to limit. 2. Configure Concurrency Limits: Use Prefect's CLI or API to set a concurrency limit on that tag. ### Steps to Configure: 1. Tagging Subflows: - When defining your subflow, use a tag that identifies it for concurrency control.
Copy code
python
   from prefect import flow, tags

   @flow
   async def ingest_model(model_id, model: dict):
       with tags("subflow-limit"):
           async with concurrency(MODEL_REGISTRATION_CONCURRENCY_LIMIT_NAME, 1):
               # Your ingestion logic here
2. Set Concurrency Limit Using CLI: - Use the Prefect CLI to set a concurrency limit on the tag:
Copy code
bash
   prefect concurrency-limit create --tag subflow-limit --limit 3
This approach allows you to control the number of concurrent subflows globally by setting a limit on the tag. For more information on setting up concurrency limits, refer to the Prefect documentation on concurrency limits. This configuration is external to the flow code and applies globally, ensuring that only a specified number of subflows with the designated tag can run concurrently across your environment.