Anh Pham
03/23/2024, 11:39 AMEncountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 855, in orchestrate_flow_run
from_async.call_soon_in_waiting_thread(
File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 125, in call_soon_in_waiting_thread
waiter.submit(call)
File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/waiters.py", line 130, in submit
self._queue.put_nowait(call)
AttributeError: 'SyncWaiter' object has no attribute '_queue'
My flow & subflow setup is something like this:
@task
def dummy_task(j: int):
return random.randint(0, j)
@flow
async def child_flow(i: int, n_tasks: int):
return dummy_task.map(j=[random.randint(1, i) for _ in range(n_task)])
@flow
async def parent_flow(n_subflows: int, n_tasks_per_subflow: int):
await asyncio.gather(
*[
child_flow(i, n_tasks_per_subflow)
for i in range(n_subflows)
]
)
Does anyone face the same issue?
Also, my initial guess this is related to concurrency module and Waiters not resolved properly for thread.
@Jake Kaplan I saw you work with this part of prefect codebase recently, do you have a minute to see what might be wrong here?Jake Kaplan
03/23/2024, 1:44 PM