<@ULVA73B9P> I have a prefect flow that can be tri...
# ask-marvin
e
@Marvin I have a prefect flow that can be triggered by a user of my webapp. However, I want to set it so that each user can have up to x number of that deployment running at any given time. How can I use the user's identifier to make the Flow Runs wait?
m
It seems my initial search didn't provide relevant information about setting concurrency limits based on a user identifier within Prefect. However, a common approach would involve the use of a lock or semaphore system, where you can track the number of runs per user and implement waiting logic. If Prefect doesn't offer an out-of-the-box solution for this specific use case, custom logic may need to be implemented within your application to track this state. For a start, I would check if it's possible to leverage Prefect's state handlers or the result backend to implement this feature. I'll refine the search to see if there's more specific guidance on how to achieve this in Prefect. I found some information that might point us in the right direction. In Prefect, you could potentially define orchestration policies that include limits on concurrent workflows, which would block entry into a Running state until those conditions are met. It suggests that a user could define rules to control the state transitions of a flow based on certain conditions. Here is the link to a bit of information about orchestrator behavior: Second Generation Workflow Engine. There's also a feature for global concurrency limits, which can be used to ensure that certain parts of your flow do not exceed a predefined level of concurrent operations. This feature could potentially be adapted or serve as inspiration to implement user-based concurrency limits within your application. Here's the related link: Global Concurrency Limits. While these resources don't offer an out-of-the-box solution for setting concurrency limits based on a user identifier, they do suggest that custom logic in your application to track and limit concurrent flow runs based on user identifiers is feasible. The use of custom state handlers or the result backend may assist in implementing such logic. If these approaches do not fully answer your question or do not align perfectly with your needs, I would recommend reaching out to the Prefect community or support for more specific guidance.
n
yeah you could use the global concurrency context manager with some user identifier as the name of the limit
e
Would this keep prefect run flows in Waiting state? @Nate
n
not the context manager, it just wouldnt do whatever is in the context until you have slots available would these customer flows all run on the same infra or would you have separate deployments for each?
e
Err the same job types would run on the same deployment
so same infra
n
gotcha so im imagining something like this
Copy code
import asyncio

from prefect import flow
from prefect.concurrency.asyncio import concurrency

CUSTOMER_LIMIT_BY_ID: dict[str, int] = {
    "customer_1": 1,
}


async def do_customer_work(customer_id: str) -> None:
    print(f"Working on customer {customer_id}")
    await asyncio.sleep(5)


@flow
async def customer_flow(customer_id: str) -> None:
    async with concurrency(customer_id, occupy=CUSTOMER_LIMIT_BY_ID[customer_id]):
        await do_customer_work(customer_id)


@flow
async def delegating_flow(customer_ids: list[str]) -> None:
    await asyncio.gather(*[customer_flow(customer_id) for customer_id in customer_ids])


if __name__ == "__main__":
    asyncio.run(delegating_flow(["customer_1"] * 3))
where in this case I just went into the UI to activate my global concurrency limit called
customer_1
, but you can use this to do it programmatically
e
Hm. So this would mean all the flow runs would be running, just not doing anything right?
n
yeah i guess it would as written
e
Ok. I will put some more thought into it, thank you
n
👍