<@ULVA73B9P> How do I ensure no more than two subf...
# marvin-in-the-wild
o
@Marvin How do I ensure no more than two subflows are running at a time for a specific concurrency key? I tried the following but I’m seeing the subflows still only running one at a time:
Copy code
global_concurrency_limit = GlobalConcurrencyLimitCreate(
    name = concurrency_key,
    limit = MAX_CONCURRENT_JOBS_PER_CLUSTER,
    active = True,
)

await client.create_global_concurrency_limit(global_concurrency_limit)

async with concurrency(concurrency_key, occupy=1):
    logger.info(f"Submitting job for tenant"
    await my_flow(tenant)
m
Based on your code snippet and the requirement to ensure no more than two subflows are running at the same time, there are a few key aspects that you should consider: - Make sure you have properly imported and are using the correct
concurrency
context manager for either synchronous or asynchronous flows. - Ensure that the concurrency limit (
MAX_CONCURRENT_JOBS_PER_CLUSTER
) is set to the desired value, which should be 2 in your case. - Ensure the
active
parameter is set to
True
when creating the global concurrency limit, which seems to be correctly set in your code. - Check if the concurrency limit is not being occupied by other tasks or flows that could be affecting the available slots. Regarding the information extracted from the documentation, the use of the
concurrency
context manager should be straightforward. The examples provided in the documentation illustrate how to use this context manager properly, which is important for managing the concurrent execution of tasks or subflows. Make sure to follow these examples, adjusting them to the asynchronous nature of your flow if necessary. Please review the following documentation for more details and examples on setting global concurrency limits and the use of the context manager: - Global Concurrency Limits If you continue to face issues with subflows running only one at a time despite these checks, it may be a bug or a configuration issue specific to your environment or Prefect version. Consider reaching out to the Prefect community or raising a GitHub issue with Prefect's repository if you suspect a bug.
o
@Marvin maybe I’m misunderstanding how to use this. But I expect the following to allow me to run 2 flows at the same time. But right now it runs sequentially. Note, I confirmed that the global concurrency control is active and has a limit of 2.
Copy code
@flow(name="Main Flow")
async def main_flow():
    for i in range(4):
        async with concurrency("omar_key", occupy=1):
            await sub_flow()

@flow(name="Sub Flow")
async def sub_flow():
    await asyncio.sleep(5)

if __name__ == '__main__':
    asyncio.run(main_flow())
I tried doing this with
subflow()
instead of
await subflow()
but I don’t see the subflows under the main flow in the UI.
n
your occupy is set to 1 here, which means that only 1 of any available slots can be occupied by work wrapped in this context
o
@Marvin I found this tucked away in the docs about task concurrency:
Flow run concurrency limits are set at a work pool and/or work queue level
While task run concurrency limits are configured via tags (as shown below), flow run concurrency limits are configured via work pools and/or work queues.
So I can either convert my flows to tasks and use tags or use work queues.
m
Yes, that's correct. There are different mechanisms for managing task-level and flow-level concurrency within Prefect. For** task-level concurrency**, you use concurrency limits with tags, as mentioned in the section you found. By associating tasks with tags and setting concurrency limits for those tags, you can control how many task runs with that tag can execute concurrently. This configuration applies across all flows that use tasks with those tags. For flow-level concurrency, Prefect allows you to set concurrency limits at the work pool and work queue level, where you can specify how many flow runs can execute simultaneously. This approach gives you control over flow concurrency based on where the flows are scheduled to run. You have the following options: 1. Convert Flows to Tasks: If your subflows can be represented as tasks and they do not need the full capabilities of a flow, you can convert the subflows to tasks and then use concurrency limits with tags to limit how many task runs can execute concurrently. 2. Use Work Queues: You can create work queues with concurrency limits and assign your flows to these work queues. This allows you to regulate how many flow runs can execute concurrently when they are picked up by an agent from that specific work queue. Here's more detailed information for each approach: - Task Concurrency Limits - Work Pool Concurrency - Work Queues Choosing between these two options depends on your specific use case and whether you want to control concurrency on the task level within your flows or on the flow level within your work pools/queues.