Omar Khudeira
05/03/2024, 4:16 PMMarvin
05/03/2024, 4:16 PMPrefectClientRunningshell
# Create a concurrency limit named "subflow-limit" with a limit of 3 running tasks at the same time
prefect concurrency-limit create subflow-limit 3
# To apply this limit, you would then tag your tasks with "subflow-limit"python
from prefect import flow, task
@task(tags=["subflow-limit"])
def subflow_task():
    # Task implementation
@flow
def main_flow():
    # Flow implementation
    # When submitting tasks
    subflow_task.submit()
# Run the main flow
if __name__ == "__main__":
    main_flow()Omar Khudeira
05/03/2024, 4:18 PMMarvin
05/03/2024, 4:19 PMPrefectClientPrefectClientpython
from prefect import flow, task
from prefect.client import get_client
@task
def subflow_task():
    # Task implementation
@flow
def main_flow():
    # Set up the concurrency limit idempotently
    with get_client() as client:
        client.create_concurrency_limit(name="subflow-limit", concurrency_limit=3)
    # Flow implementation
    subflow_task.submit()
# Run the main flow
if __name__ == "__main__":
    main_flow()concurrencyconcurrencyconcurrencypython
from prefect import flow, task
from prefect.concurrency.sync import concurrency
@task
def process_data(x, y):
    with concurrency("database", occupy=1):
        return x + y
@flow
def my_flow():
    for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
        process_data.submit(x, y)
if __name__ == "__main__":
    my_flow()prefect.concurrency.asyncioOmar Khudeira
05/03/2024, 4:21 PMMarvin
05/03/2024, 4:21 PMconcurrencyPrefectClientPrefectClientPrefectClientpython
from prefect import flow, task
from prefect.client import get_client
@task
def subflow_task():
    # Task implementation
@flow
def main_flow():
    with get_client() as client:
        # Hypothetical API method to create or update the concurrency limit
        client.create_or_update_concurrency_limit("subflow-limit", limit=3)
    # Flow implementation
    subflow_task.submit()
# Run the main flow
if __name__ == "__main__":
    main_flow()Omar Khudeira
05/03/2024, 4:25 PMfrom prefect.client import get_clientfrom prefect import get_clientMarvin
05/03/2024, 4:26 PMget_clientfrom prefect import get_clientget_clientget_client()python
from prefect import flow, task, get_client
@task
def subflow_task():
    # Task implementation
@flow
def main_flow():
    with get_client() as client:
        # Pseudo-code for creating/updating concurrency limit; this is not a real method
        # You would need to look at the PrefectClient documentation or source code
        # to find the correct method to create or update concurrency limits via the API
        client.create_or_update_concurrency_limit(name="subflow-limit", limit=3)
    # Flow implementation
    subflow_task.submit()
# Run the main flow
if __name__ == "__main__":
    main_flow()create_or_update_concurrency_limitOmar Khudeira
05/03/2024, 8:39 PMMarvin
05/03/2024, 8:40 PMconcurrencyconcurrencyconcurrencypython
from prefect import flow, task
from prefect.concurrency.sync import concurrency
@task
def process_data(x, y):
    with concurrency("data-processing-limit", occupy=1):
        # ... process data
@flow
def my_flow():
    for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
        process_data.submit(x, y)
if __name__ == "__main__":
    my_flow()data-processing-limitprocess_data