Marcos
06/13/2023, 1:54 PManyio.CapacityLimiter
but after some tasks finish the flow seems to hang and no more tasks execute, can you help me? (code example and logs in thread)Marcos
06/13/2023, 1:55 PMimport asyncio
import anyio
from anyio import CapacityLimiter
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
@task(log_prints=True)
async def task_with_lock(lock: CapacityLimiter):
async with lock:
await anyio.sleep(1)
print("Stop sleeping")
@flow(task_runner=ConcurrentTaskRunner())
async def flow_with_lock():
lock = CapacityLimiter(2)
for _ in range(5):
await task_with_lock.submit(lock)
if __name__ == '__main__':
asyncio.run(flow_with_lock())
Marcos
06/13/2023, 1:56 PM13:55:21.634 | INFO | prefect.engine - Created flow run 'accurate-moose' for flow 'flow-with-lock'
13:55:21.732 | INFO | Flow run 'accurate-moose' - Created task run 'task_with_lock-0' for task 'task_with_lock'
13:55:21.732 | INFO | Flow run 'accurate-moose' - Submitted task run 'task_with_lock-0' for execution.
13:55:22.779 | INFO | Task run 'task_with_lock-0' - Stop sleeping
13:55:22.830 | INFO | Task run 'task_with_lock-0' - Finished in state Completed()
13:55:26.786 | INFO | Flow run 'accurate-moose' - Created task run 'task_with_lock-4' for task 'task_with_lock'
13:55:26.787 | INFO | Flow run 'accurate-moose' - Submitted task run 'task_with_lock-4' for execution.
13:55:26.800 | INFO | Flow run 'accurate-moose' - Created task run 'task_with_lock-2' for task 'task_with_lock'
13:55:26.801 | INFO | Flow run 'accurate-moose' - Submitted task run 'task_with_lock-2' for execution.
13:55:26.826 | INFO | Flow run 'accurate-moose' - Created task run 'task_with_lock-1' for task 'task_with_lock'
13:55:26.827 | INFO | Flow run 'accurate-moose' - Submitted task run 'task_with_lock-1' for execution.
13:55:26.851 | INFO | Flow run 'accurate-moose' - Created task run 'task_with_lock-3' for task 'task_with_lock'
13:55:26.852 | INFO | Flow run 'accurate-moose' - Submitted task run 'task_with_lock-3' for execution.
13:55:27.904 | INFO | Task run 'task_with_lock-4' - Stop sleeping
13:55:27.928 | INFO | Task run 'task_with_lock-2' - Stop sleeping
13:55:27.957 | INFO | Task run 'task_with_lock-4' - Finished in state Completed()
13:55:27.975 | INFO | Task run 'task_with_lock-2' - Finished in state Completed()
Dominic Tarro
06/13/2023, 6:03 PMasyncio.Semaphore
instead. You will notice an error regarding which event loop the semaphore is bound to. May want to look into whether anyio
handles this issue, or if it causes things to hang.Marcos
06/13/2023, 6:35 PMasyncio.Semaphore
raises:
/opt/poetry/.venv/lib/python3.9/site-packages/prefect/_internal/concurrency/api.py:182: in wait_for_call_in_loop_thread
return call.result()
/opt/poetry/.venv/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py:283: in result
return self.future.result(timeout=timeout)
/opt/poetry/.venv/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py:169: in result
return self.__get_result()
/usr/local/lib/python3.9/concurrent/futures/_base.py:389: in __get_result
raise self._exception
/opt/poetry/.venv/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py:346: in _run_async
result = await coro
/opt/poetry/.venv/lib/python3.9/site-packages/prefect/client/utilities.py:40: in with_injected_client
return await fn(*args, **kwargs)
/opt/poetry/.venv/lib/python3.9/site-packages/prefect/engine.py:311: in create_then_begin_flow_run
return await state.result(fetch=True)
/opt/poetry/.venv/lib/python3.9/site-packages/prefect/states.py:91: in _get_state_result
raise await get_state_exception(state)
/opt/poetry/.venv/lib/python3.9/site-packages/prefect/engine.py:1655: in orchestrate_task_run
result = await call.aresult()
/opt/poetry/.venv/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py:292: in aresult
return await asyncio.wrap_future(self.future)
/opt/poetry/.venv/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py:346: in _run_async
result = await coro
test_a.py:15: in task_with_lock
async with lock:
/usr/local/lib/python3.9/asyncio/locks.py:14: in __aenter__
await self.acquire()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <asyncio.locks.Semaphore object at 0xffffb2405d90 [unlocked, value:2]>
async def acquire(self):
"""Acquire a semaphore.
If the internal counter is larger than zero on entry,
decrement it by one and return True immediately. If it is
zero on entry, block, waiting until some other coroutine has
called release() to make it larger than 0, and then return
True.
"""
while self._value <= 0:
fut = self._loop.create_future()
self._waiters.append(fut)
try:
> await fut
E RuntimeError: Task <Task pending name='Task-49' coro=<Call._run_async() running at /opt/poetry/.venv/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py:346> cb=[_run_until_complete_cb() at /usr/local/lib/python3.9/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop
/usr/local/lib/python3.9/asyncio/locks.py:413: RuntimeError
Marcos
06/13/2023, 6:37 PMDominic Tarro
06/13/2023, 6:54 PMMaity
08/09/2023, 2:25 AMMarcos
10/04/2023, 2:52 AM