Robin
10/14/2024, 6:06 PMprefect.concurrency.sync.rate_limit()
How to best set up the flow so 50 concurrent runs are possible?
Or are the rate limit so strict, that it is not possible?
PS: Currently we are still on prefect 2, would switching to prefect 3 reduce issues?
###
Context:
I tried different options from the concurrency documentation:
from prefect.concurrency.sync import concurrency, rate_limit
@task
def run_calibration(...):
with concurrency("ECS Global Concurrency", occupy=1):
run_deployment(...)
@flow
def run_calibrations_map(...):
run_calibration.map(parameters)
@flow
def run_calibrations_submit(...):
for param in parameters:
run_calibration.submit(param)
I also tried rate_limit
instead of the concurrency
context, but both don't work stabily 🤔
Yesterday it once ran through with run_calibrations_map and concurrency context manager, but today it doesn't anymore...Robin
10/14/2024, 11:47 PMprocessed_data = []
while data_items:
async with concurrency("data-processing", occupy=n_parallel_calibrations_runs):
chunk = [data_items.pop() for _ in range(n_parallel_calibrations_runs)]
processed_data += await asyncio.gather(
*[
run_calibration(
system_id=item[0],
from_time=item[1],
to_time=item[2],
run_in_cloud=run_in_cloud,
)
for item in chunk
]
)
This seems odd, is there something I miss?Robin
10/14/2024, 11:54 PMRobin
10/14/2024, 11:55 PMNate
10/15/2024, 12:45 AMrate_limit
instead of the concurrency
context, but both don't work stabily
do you have a minimal example of what you mean?Robin
10/15/2024, 6:56 AMfrom prefect.concurrency.sync import concurrency, rate_limit
@task
def run_calibration(...):
with concurrency("ECS Global Concurrency", occupy=1):
run_deployment(...)
@flow
def run_calibrations_map(...):
run_calibration.map(parameters)
@flow
def run_calibrations_submit(...):
for param in parameters:
run_calibration.submit(param)
Robin
10/15/2024, 7:13 AMfrom prefect.concurrency.sync import concurrency, rate_limit
@task
def run_calibration(...):
rate_limit()
run_deployment(...)
@flow
def run_calibrations_map(...):
run_calibration.map(parameters)
@flow
def run_calibrations_submit(...):
for param in parameters:
run_calibration.submit(param)
Jainish Savalia
12/06/2024, 8:17 AMflow_tasks = [run_deployment(
"abc/ABC",
parameters=dict(),
work_queue_name="xyz",
as_subflow=True,
) for _ in range(10)]
runs = await asyncio.gather(*flow_tasks)
Robin
12/06/2024, 4:49 PM