Omar Khudeira
05/07/2024, 2:26 PMglobal_concurrency_limit = GlobalConcurrencyLimitCreate(
name = concurrency_key,
limit = MAX_CONCURRENT_JOBS_PER_CLUSTER,
active = True,
)
await client.create_global_concurrency_limit(global_concurrency_limit)
async with concurrency(concurrency_key, occupy=1):
logger.info(f"Submitting job for tenant"
await my_flow(tenant)
Marvin
05/07/2024, 2:26 PMconcurrency
context manager for either synchronous or asynchronous flows.
- Ensure that the concurrency limit (MAX_CONCURRENT_JOBS_PER_CLUSTER
) is set to the desired value, which should be 2 in your case.
- Ensure the active
parameter is set to True
when creating the global concurrency limit, which seems to be correctly set in your code.
- Check if the concurrency limit is not being occupied by other tasks or flows that could be affecting the available slots.
Regarding the information extracted from the documentation, the use of the concurrency
context manager should be straightforward. The examples provided in the documentation illustrate how to use this context manager properly, which is important for managing the concurrent execution of tasks or subflows. Make sure to follow these examples, adjusting them to the asynchronous nature of your flow if necessary.
Please review the following documentation for more details and examples on setting global concurrency limits and the use of the context manager:
- Global Concurrency Limits
If you continue to face issues with subflows running only one at a time despite these checks, it may be a bug or a configuration issue specific to your environment or Prefect version. Consider reaching out to the Prefect community or raising a GitHub issue with Prefect's repository if you suspect a bug.Omar Khudeira
05/07/2024, 2:45 PM@flow(name="Main Flow")
async def main_flow():
for i in range(4):
async with concurrency("omar_key", occupy=1):
await sub_flow()
@flow(name="Sub Flow")
async def sub_flow():
await asyncio.sleep(5)
if __name__ == '__main__':
asyncio.run(main_flow())
I tried doing this with subflow()
instead of await subflow()
but I don’t see the subflows under the main flow in the UI.Nate
05/07/2024, 3:06 PMOmar Khudeira
05/07/2024, 5:41 PMFlow run concurrency limits are set at a work pool and/or work queue level
While task run concurrency limits are configured via tags (as shown below), flow run concurrency limits are configured via work pools and/or work queues.So I can either convert my flows to tasks and use tags or use work queues.
Marvin
05/07/2024, 5:41 PM