<@ULVA73B9P> does the concurrency control in `pref...
# marvin-in-the-wild
o
@Marvin does the concurrency control in
prefect.concurrency.sync
operate within the same flow/sub-flow or does it block across independent flows?
m
The concurrency control in
prefect.concurrency.sync
operates based on concurrency limits that can be set globally. When using the concurrency context manager, the limit you use will be applied to the scope where you define it. As indicated in the documentation, if the concurrency limit doesn't exist, it will be created in an inactive state. Here's a brief example using
prefect.concurrency.sync
for a synchronous flow:
Copy code
python
from prefect import flow, task
from prefect.concurrency.sync import concurrency

@task
def process_data(x, y):
    with concurrency("database", occupy=1):
        return x + y

@flow
def my_flow():
    for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
        process_data.submit(x, y)

if __name__ == "__main__":
    my_flow()
The context manager
with concurrency("database", occupy=1):
applies the concurrency limit to the
process_data
task, which means it controls the concurrency for operations within the flow or sub-flow where it's defined. If multiple flows or sub-flows are using the same named limit, then they would be respecting that limit across those flows or sub-flows. The documentation does not explicitly state whether this concurrency limit blocks across completely independent flows, but given they are named global concurrency limits, it is implied that if independent flows also make use of the same named limit, they would also respect the limit globally.