Hey all, I have some odd behavior when running a d...
# ask-community
r
Hey all, I have some odd behavior when running a deployment that triggers other deployments. I want to run on an ECS cluster with 50 workers in parallel over 400 IDs. However, I get API rate limit errors although I use
prefect.concurrency.sync.rate_limit()
How to best set up the flow so 50 concurrent runs are possible? Or are the rate limit so strict, that it is not possible? PS: Currently we are still on prefect 2, would switching to prefect 3 reduce issues? ### Context: I tried different options from the concurrency documentation:
Copy code
from prefect.concurrency.sync import concurrency, rate_limit

@task
def run_calibration(...):

    with concurrency("ECS Global Concurrency", occupy=1):
        run_deployment(...)

@flow
def run_calibrations_map(...):
    run_calibration.map(parameters)

@flow
def run_calibrations_submit(...):
    for param in parameters:
        run_calibration.submit(param)
I also tried
rate_limit
instead of the
concurrency
context, but both don't work stabily 🤔 Yesterday it once ran through with run_calibrations_map and concurrency context manager, but today it doesn't anymore...
OK, I think I tried almost all combinations, now also the example for batched submission, but whenever I submit a deployment that would trigger a high amount of "sub deployment runs" the subflow runs will not even start. In all combinations, the deployments ran fine although the concurrency had somewhat buggy behavior...
Copy code
processed_data = []
    while data_items:
        async with concurrency("data-processing", occupy=n_parallel_calibrations_runs):
            chunk = [data_items.pop() for _ in range(n_parallel_calibrations_runs)]
            processed_data += await asyncio.gather(
                *[
                    run_calibration(
                        system_id=item[0],
                        from_time=item[1],
                        to_time=item[2],
                        run_in_cloud=run_in_cloud,
                    )
                    for item in chunk
                ]
            )
This seems odd, is there something I miss?
only 20 task runs are submitted at a time
but 0 flow runs are submitted
n
hey @Robin > I also tried
rate_limit
instead of the
concurrency
context, but both don't work stabily do you have a minimal example of what you mean?
r
This is not entirely a minimal example, but the idea is to have two docker deployments in which one triggers >400 of the second one and they are executed on an AWS ECS cluster with 50 deployment runs in parallel:
Copy code
from prefect.concurrency.sync import concurrency, rate_limit

@task
def run_calibration(...):

    with concurrency("ECS Global Concurrency", occupy=1):
        run_deployment(...)

@flow
def run_calibrations_map(...):
    run_calibration.map(parameters)

@flow
def run_calibrations_submit(...):
    for param in parameters:
        run_calibration.submit(param)
Example with rate_limit:
Copy code
from prefect.concurrency.sync import concurrency, rate_limit

@task
def run_calibration(...):

    rate_limit()
    run_deployment(...)

@flow
def run_calibrations_map(...):
    run_calibration.map(parameters)

@flow
def run_calibrations_submit(...):
    for param in parameters:
        run_calibration.submit(param)
j
@Robin I have tried rate_limit in this type of setup and have found issues. I had registered the concurrency limits and all but it didn't follow. After recreating that concurrency limit, it worked as expected. Have you tried that? Also I run the subflows like this
Copy code
flow_tasks = [run_deployment(
    "abc/ABC",
     parameters=dict(),
     work_queue_name="xyz",
     as_subflow=True,
) for _ in range(10)]

runs = await asyncio.gather(*flow_tasks)
r
Thanks! I may check it out again 🙂