Riya Sinha
01/29/2025, 8:46 AMasync with concurrency(MODEL_REGISTRATION_CONCURRENCY_LIMIT_NAME, 5):
for model_id, model in models.items():
await ingest_model(model_id, model)
mean taht only 5 will execute at a time? or all will execute and only count for 5? this is a global concurrency limitMarvin
01/29/2025, 8:47 AMconcurrency
context manager in Prefect is used to control the maximum number of concurrent operations. In your code snippet, it ensures that only up to 5 operations can execute concurrently within the context. This means that at any given time, only 5 instances of ingest_model
will run concurrently, adhering to the specified concurrency limit.
This is a global concurrency limit, so it applies across all uses of the same concurrency context identified by MODEL_REGISTRATION_CONCURRENCY_LIMIT_NAME
.
For more details, you can refer to the Prefect documentation on global concurrency limits: Prefect Global Concurrency Limits.Riya Sinha
01/29/2025, 9:18 AM@flow
async def model_preprocessing_flow(models: dict[str, dict[str, str]]):
ingestion_coros = []
for model_id, model in models.items():
ingestion_coros.append(ingest_model_limited(model_id, model))
results = await asyncio.gather(*ingestion_coros, return_exceptions=True)
async def ingest_model_limited(model_id, model, genome_set):
"""
Helper function to allow for the flow to be wrapped in a concurrency block
"""
async with concurrency(MODEL_REGISTRATION_CONCURRENCY_LIMIT_NAME, 1):
await ingest_model(model_id, model)
@flow
async def ingest_model(model_id, model: dict):
...
@Marvin why does it initially start respecting the concurrency limits and only launching 5 subflows, but then even though the 5 initial subflows are still running, the global concurrency active number drops to 0 and it launches all of them?Marvin
01/29/2025, 9:19 AMslot_decay_per_second
parameter, to ensure it's set as intended.
- Logging and Debugging: Add logging within the concurrency
context to track when slots are acquired and released. This might provide insights into unexpected slot releases.
- Prefect Version Issues: Ensure you're using a stable version of Prefect, as concurrency issues might be resolved in newer versions.
For more detailed guidance, you might want to refer to the Prefect documentation on global concurrency limits: Prefect Global Concurrency Limits. If the problem persists, consider reaching out to the Prefect community or support for more tailored assistance.Riya Sinha
01/29/2025, 9:21 AMMarvin
01/29/2025, 9:21 AMslot_decay_per_second
is configured, the slot can become available again over time, independent of whether the task has completed. This can lead to a situation where more tasks are allowed to start if the decay rate is high enough.
This behavior is intended to create a "rate limiting" effect, allowing tasks to run more frequently but with controlled pauses between them. If you want the slot to be held for the entire duration of the task, make sure that the slot decay is set to a value that aligns with your task execution time, or simply set it to 0
to disable decay.
For a more detailed explanation, you can refer to the Prefect documentation on global concurrency limits: Prefect Global Concurrency Limits.Riya Sinha
01/29/2025, 9:22 AMMarvin
01/29/2025, 9:22 AMslot_decay_per_second
parameter is set to 0
or is not configured. This will make the slot occupancy duration equal to the task execution time.
2. Verify Configuration: Double-check the concurrency limit configuration using the Prefect CLI or other configuration tools to ensure that no decay is inadvertently set.
By doing this, the slots will be held until the tasks complete, thereby respecting the concurrency limit throughout the task's execution.
For more information, you might want to review the Prefect documentation on configuring concurrency limits: Prefect Global Concurrency Limits.Riya Sinha
01/29/2025, 9:37 AMMarvin
01/29/2025, 9:38 AM