Riya Sinha
01/29/2025, 7:35 AMMarvin
01/29/2025, 7:35 AMRiya Sinha
01/29/2025, 8:22 AM@flow(task_runner=ThreadPoolTaskRunner(max_workers=10))
async def model_preprocessing_flow(models: dict[str, dict[str, str]]):
await create_and_wait_for_pvc()
await create_and_wait_for_pvc_access_pod()
ingestion_coros = []
for model_id, model in models.items():
ingestion_coros.append(ingest_model(model_id, model))
await asyncio.gather(*ingestion_coros)
@flow
async def ingest_model(model_id, model: dict):
async with concurrency(MODEL_REGISTRATION_CONCURRENCY_LIMIT_NAME, 1):
Fix the above code so that all the subflows are not created at once and then throttled; use the concurrency in teh parent flow to limit the creation of child flowsMarvin
01/29/2025, 8:23 AMThreadPoolTaskRunner
to control the number of concurrent tasks being executed. However, to ensure that subflows are not all created at once, you need to manage the coroutine execution within the parent flow.
Here's a revised version of your code using the ThreadPoolTaskRunner
to limit concurrency:
python
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
import asyncio
@flow(task_runner=ThreadPoolTaskRunner(max_workers=3))
async def model_preprocessing_flow(models: dict[str, dict[str, str]]):
await create_and_wait_for_pvc()
await create_and_wait_for_pvc_access_pod()
for model_id, model in models.items():
await ingest_model(model_id, model)
@flow
async def ingest_model(model_id, model: dict):
async with concurrency(MODEL_REGISTRATION_CONCURRENCY_LIMIT_NAME, 1):
# Your ingestion logic here
### Key Changes:
- Task Runner Configuration: Set max_workers
to 3 in ThreadPoolTaskRunner
to limit the number of concurrent subflows.
- Sequential Execution: Use a loop to sequentially await each ingest_model
call within the model_preprocessing_flow
. This prevents all subflows from being created at once.
This setup ensures that the parent flow controls the number of subflows being executed concurrently. For more details, you can refer to the Prefect documentation on task runners.Riya Sinha
01/29/2025, 8:25 AMMarvin
01/29/2025, 8:25 AMawait
directly in a loop will execute the subflows sequentially, not in parallel. To achieve parallel execution with a limit on concurrency, you can indeed use global concurrency limits or manage the coroutines differently.
Here's how you can adjust your code to use global concurrency control more effectively:
1. Global Concurrency Limit: Use Prefect's global concurrency limits to restrict the number of subflows running concurrently.
2. Concurrency Management: Use asyncio.Semaphore
to manage concurrency within the parent flow.
Here's how you can modify the code:
python
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner
import asyncio
@flow(task_runner=ThreadPoolTaskRunner(max_workers=10))
async def model_preprocessing_flow(models: dict[str, dict[str, str]]):
await create_and_wait_for_pvc()
await create_and_wait_for_pvc_access_pod()
semaphore = asyncio.Semaphore(3) # Limit to 3 concurrent subflows
async def limited_ingest_model(model_id, model):
async with semaphore:
await ingest_model(model_id, model)
ingestion_coros = [
limited_ingest_model(model_id, model) for model_id, model in models.items()
]
await asyncio.gather(*ingestion_coros)
@flow
async def ingest_model(model_id, model: dict):
async with concurrency(MODEL_REGISTRATION_CONCURRENCY_LIMIT_NAME, 1):
# Your ingestion logic here
### Key Changes:
- Semaphore for Concurrency: Use asyncio.Semaphore
to limit the number of concurrent subflows to 3.
- Parallel Execution: Use asyncio.gather
to execute subflows in parallel within the limits set by the semaphore.
This approach allows the parent flow to handle subflow concurrency more effectively while adhering to the global concurrency constraints. For more details on concurrency limits, refer to Prefect's documentation on global concurrency limits.Riya Sinha
01/29/2025, 8:27 AMMarvin
01/29/2025, 8:27 AMpython
from prefect import flow, tags
@flow
async def ingest_model(model_id, model: dict):
with tags("subflow-limit"):
async with concurrency(MODEL_REGISTRATION_CONCURRENCY_LIMIT_NAME, 1):
# Your ingestion logic here
2. Set Concurrency Limit Using CLI:
- Use the Prefect CLI to set a concurrency limit on the tag:
bash
prefect concurrency-limit create --tag subflow-limit --limit 3
This approach allows you to control the number of concurrent subflows globally by setting a limit on the tag. For more information on setting up concurrency limits, refer to the Prefect documentation on concurrency limits.
This configuration is external to the flow code and applies globally, ensuring that only a specified number of subflows with the designated tag can run concurrently across your environment.