Hi! I'm trying to limit tasks concurrent access to...
# ask-community
m
Hi! I'm trying to limit tasks concurrent access to a resource using
anyio.CapacityLimiter
but after some tasks finish the flow seems to hang and no more tasks execute, can you help me? (code example and logs in thread)
Copy code
import asyncio

import anyio
from anyio import CapacityLimiter
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner


@task(log_prints=True)
async def task_with_lock(lock: CapacityLimiter):
    async with lock:
        await anyio.sleep(1)
        print("Stop sleeping")


@flow(task_runner=ConcurrentTaskRunner())
async def flow_with_lock():
    lock = CapacityLimiter(2)
    for _ in range(5):
        await task_with_lock.submit(lock)


if __name__ == '__main__':
    asyncio.run(flow_with_lock())
Copy code
13:55:21.634 | INFO    | prefect.engine - Created flow run 'accurate-moose' for flow 'flow-with-lock'
13:55:21.732 | INFO    | Flow run 'accurate-moose' - Created task run 'task_with_lock-0' for task 'task_with_lock'
13:55:21.732 | INFO    | Flow run 'accurate-moose' - Submitted task run 'task_with_lock-0' for execution.
13:55:22.779 | INFO    | Task run 'task_with_lock-0' - Stop sleeping
13:55:22.830 | INFO    | Task run 'task_with_lock-0' - Finished in state Completed()
13:55:26.786 | INFO    | Flow run 'accurate-moose' - Created task run 'task_with_lock-4' for task 'task_with_lock'
13:55:26.787 | INFO    | Flow run 'accurate-moose' - Submitted task run 'task_with_lock-4' for execution.
13:55:26.800 | INFO    | Flow run 'accurate-moose' - Created task run 'task_with_lock-2' for task 'task_with_lock'
13:55:26.801 | INFO    | Flow run 'accurate-moose' - Submitted task run 'task_with_lock-2' for execution.
13:55:26.826 | INFO    | Flow run 'accurate-moose' - Created task run 'task_with_lock-1' for task 'task_with_lock'
13:55:26.827 | INFO    | Flow run 'accurate-moose' - Submitted task run 'task_with_lock-1' for execution.
13:55:26.851 | INFO    | Flow run 'accurate-moose' - Created task run 'task_with_lock-3' for task 'task_with_lock'
13:55:26.852 | INFO    | Flow run 'accurate-moose' - Submitted task run 'task_with_lock-3' for execution.
13:55:27.904 | INFO    | Task run 'task_with_lock-4' - Stop sleeping
13:55:27.928 | INFO    | Task run 'task_with_lock-2' - Stop sleeping
13:55:27.957 | INFO    | Task run 'task_with_lock-4' - Finished in state Completed()
13:55:27.975 | INFO    | Task run 'task_with_lock-2' - Finished in state Completed()
d
Try this using the
asyncio.Semaphore
instead. You will notice an error regarding which event loop the semaphore is bound to. May want to look into whether
anyio
handles this issue, or if it causes things to hang.
m
As you said using
asyncio.Semaphore
raises:
Copy code
/opt/poetry/.venv/lib/python3.9/site-packages/prefect/_internal/concurrency/api.py:182: in wait_for_call_in_loop_thread
    return call.result()
/opt/poetry/.venv/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py:283: in result
    return self.future.result(timeout=timeout)
/opt/poetry/.venv/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py:169: in result
    return self.__get_result()
/usr/local/lib/python3.9/concurrent/futures/_base.py:389: in __get_result
    raise self._exception
/opt/poetry/.venv/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py:346: in _run_async
    result = await coro
/opt/poetry/.venv/lib/python3.9/site-packages/prefect/client/utilities.py:40: in with_injected_client
    return await fn(*args, **kwargs)
/opt/poetry/.venv/lib/python3.9/site-packages/prefect/engine.py:311: in create_then_begin_flow_run
    return await state.result(fetch=True)
/opt/poetry/.venv/lib/python3.9/site-packages/prefect/states.py:91: in _get_state_result
    raise await get_state_exception(state)
/opt/poetry/.venv/lib/python3.9/site-packages/prefect/engine.py:1655: in orchestrate_task_run
    result = await call.aresult()
/opt/poetry/.venv/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py:292: in aresult
    return await asyncio.wrap_future(self.future)
/opt/poetry/.venv/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py:346: in _run_async
    result = await coro
test_a.py:15: in task_with_lock
    async with lock:
/usr/local/lib/python3.9/asyncio/locks.py:14: in __aenter__
    await self.acquire()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <asyncio.locks.Semaphore object at 0xffffb2405d90 [unlocked, value:2]>

    async def acquire(self):
        """Acquire a semaphore.
    
        If the internal counter is larger than zero on entry,
        decrement it by one and return True immediately.  If it is
        zero on entry, block, waiting until some other coroutine has
        called release() to make it larger than 0, and then return
        True.
        """
        while self._value <= 0:
            fut = self._loop.create_future()
            self._waiters.append(fut)
            try:
>               await fut
E               RuntimeError: Task <Task pending name='Task-49' coro=<Call._run_async() running at /opt/poetry/.venv/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py:346> cb=[_run_until_complete_cb() at /usr/local/lib/python3.9/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop

/usr/local/lib/python3.9/asyncio/locks.py:413: RuntimeError
Is it possible to use synchronization primitives inside prefect flows and tasks?
d
I really don't know. Need to ask the development team or add an issue in GitHub
m
Did you ever figure out if it is possible to use synchronization primitives inside prefect flows and tasks? Using anyio.Lock works locally, but will hang indefinitely for some tasks when deployed to Kubernetes.
m
Hi! Sorry for the late response, I fixed it using prefect limits