Omar Khudeira
05/02/2024, 9:19 PMprefect.concurrency.sync
operate within the same flow/sub-flow or does it block across independent flows?Marvin
05/02/2024, 9:20 PMprefect.concurrency.sync
operates based on concurrency limits that can be set globally. When using the concurrency context manager, the limit you use will be applied to the scope where you define it. As indicated in the documentation, if the concurrency limit doesn't exist, it will be created in an inactive state.
Here's a brief example using prefect.concurrency.sync
for a synchronous flow:
python
from prefect import flow, task
from prefect.concurrency.sync import concurrency
@task
def process_data(x, y):
with concurrency("database", occupy=1):
return x + y
@flow
def my_flow():
for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
process_data.submit(x, y)
if __name__ == "__main__":
my_flow()
The context manager with concurrency("database", occupy=1):
applies the concurrency limit to the process_data
task, which means it controls the concurrency for operations within the flow or sub-flow where it's defined. If multiple flows or sub-flows are using the same named limit, then they would be respecting that limit across those flows or sub-flows. The documentation does not explicitly state whether this concurrency limit blocks across completely independent flows, but given they are named global concurrency limits, it is implied that if independent flows also make use of the same named limit, they would also respect the limit globally.