Thomas Fredriksen
11/11/2022, 9:39 AM@asynccontextmanager
async def concurrency_limit(task: Task, limit: int = 10) -> Task:
context = get_run_context()
tag = f"{context.flow_run.id}-{task.name}"
client = get_client()
await client.create_concurrency_limit(tag, limit)
decorated_task = task.with_options(tags=[tag])
try:
yield decorated_task
finally:
await client.delete_concurrency_limit_by_tag(tag)
@task
async def hello_number(nmbr: int):
log = get_run_logger()
<http://log.info|log.info>("Hello, my number is: %d", nmbr)
await asyncio.sleep(1)
@flow(name="example_flow_concurrency_limit")
async def main():
r = list(range(20))
async with concurrency_limit(hello_number, 1) as tsk:
await tsk.map(r)
Kalise Richmond
11/11/2022, 4:51 PMThomas Fredriksen
11/14/2022, 10:56 AMawait client.delete_concurrency_limit_by_tag(tag)
from the code above produces the same result.async def wait_for_concurrency_limit(
client: OrionClient,
tag: str,
check_interval: float = 0.5,
timeout: float = 0.0
):
start_time = time.time()
while True:
if 0 < timeout < (time.time() - start_time):
LOG.warning("Concurrency wait timeout")
break
tasks = await client.read_task_runs(task_run_filter=TaskRunFilter(tags={"all_": [tag]}))
active_tasks = [x for x in tasks if x.state.type not in TERMINAL_STATES]
if len(active_tasks) > 0:
await anyio.sleep(check_interval)
Dynamic concurrency limit context:
@asynccontextmanager
async def concurrency_limit(task: Task, limit: int = 10) -> Task:
context = get_run_context()
tag = f"{context.flow_run.id}-{task.name}"
client = get_client()
await client.create_concurrency_limit(tag, limit)
decorated_task: Task = task.with_options(tags=[tag])
try:
yield decorated_task
await wait_for_concurrency_limit(client, tag)
finally:
await client.delete_concurrency_limit_by_tag(tag)
Mason Menges
11/15/2022, 7:56 PMThomas Fredriksen
11/15/2022, 7:57 PMMason Menges
12/06/2022, 4:19 PMThomas Fredriksen
12/06/2022, 4:29 PM