Robin
10/14/2024, 6:06 PMprefect.concurrency.sync.rate_limit()from prefect.concurrency.sync import concurrency, rate_limit
@task
def run_calibration(...):
    with concurrency("ECS Global Concurrency", occupy=1):
        run_deployment(...)
@flow
def run_calibrations_map(...):
    run_calibration.map(parameters)
@flow
def run_calibrations_submit(...):
    for param in parameters:
        run_calibration.submit(param)rate_limitconcurrencyRobin
10/14/2024, 11:47 PMprocessed_data = []
    while data_items:
        async with concurrency("data-processing", occupy=n_parallel_calibrations_runs):
            chunk = [data_items.pop() for _ in range(n_parallel_calibrations_runs)]
            processed_data += await asyncio.gather(
                *[
                    run_calibration(
                        system_id=item[0],
                        from_time=item[1],
                        to_time=item[2],
                        run_in_cloud=run_in_cloud,
                    )
                    for item in chunk
                ]
            )Robin
10/14/2024, 11:54 PMRobin
10/14/2024, 11:55 PMNate
10/15/2024, 12:45 AMrate_limitconcurrencyRobin
10/15/2024, 6:56 AMfrom prefect.concurrency.sync import concurrency, rate_limit
@task
def run_calibration(...):
    with concurrency("ECS Global Concurrency", occupy=1):
        run_deployment(...)
@flow
def run_calibrations_map(...):
    run_calibration.map(parameters)
@flow
def run_calibrations_submit(...):
    for param in parameters:
        run_calibration.submit(param)Robin
10/15/2024, 7:13 AMfrom prefect.concurrency.sync import concurrency, rate_limit
@task
def run_calibration(...):
    rate_limit()
    run_deployment(...)
@flow
def run_calibrations_map(...):
    run_calibration.map(parameters)
@flow
def run_calibrations_submit(...):
    for param in parameters:
        run_calibration.submit(param)Jainish Savalia
12/06/2024, 8:17 AMflow_tasks = [run_deployment(
    "abc/ABC",
     parameters=dict(),
     work_queue_name="xyz",
     as_subflow=True,
) for _ in range(10)]
runs = await asyncio.gather(*flow_tasks)Robin
12/06/2024, 4:49 PM