https://prefect.io logo
Title
t

Tim-Oliver

11/23/2022, 2:11 PM
Hello, I am running into an
asyncio
exception when using
DaskTaskRunner
which leads to tasks hanging and not completing. -->
13:44:03.390 | INFO    | Task run 'run_sofima-57be2d9c-1187' - Compute mesh for section test-experiment/Sample/s2098_g1.
13:44:07.972 | ERROR   | asyncio - Exception in callback WorkerThread._report_result(<Future pendi...c62b6bc40>()]>, None, StopIteration())
handle: <Handle WorkerThread._report_result(<Future pendi...c62b6bc40>()]>, None, StopIteration())>
Traceback (most recent call last):
  File "/tungstenfs/scratch/gmicro_share/_prefect/miniconda3/envs/test_gfriedri-em-alignment-flows/lib/python3.9/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "/tungstenfs/scratch/gmicro_share/_prefect/miniconda3/envs/test_gfriedri-em-alignment-flows/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 849, in _report_result
    future.set_exception(exc)
TypeError: StopIteration interacts badly with generators and cannot be raised into a Future
The worker then continues to complete other tasks submitted to it. But this failed tasks stays pending indefinitely which blocks the whole flow eventually.
b

Bianca Hoch

11/23/2022, 9:48 PM
I agree with your assessment that it is related to the error you posted. It looks like it may be fixed in a future version of AnyIO (ver 4.0.0).
t

Tim-Oliver

11/24/2022, 1:50 PM
For now I am catching the
StopIteration
exception inside the task and raise a
RunTime
exception. Not sure if it makes sense to add this "fix" to the prefect task until AnyIO 4.0 is released.
k

Kate Weber

01/17/2023, 7:02 PM
@Tim-Oliver Can you help me understand this solution a little better? I'm wrapping the call to the function where it dies in
try/except
and it's still dying on me
This is happening with a relatively straight-forward SequentialTaskRunner:
19:17:21.709 | ERROR   | asyncio - Exception in callback WorkerThread._report_result(<Future pendi...0fdcc64c0>()]>, None, StopIteration())
handle: <Handle WorkerThread._report_result(<Future pendi...0fdcc64c0>()]>, None, StopIteration())>                                                                                                                                           
Traceback (most recent call last):
  File "/usr/lib/python3.8/asyncio/events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
  File "/big/.virtualenvs/kate_tooling/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 849, in _report_result
    future.set_exception(exc)
TypeError: StopIteration interacts badly with generators and cannot be raised into a Future
t

Tim-Oliver

01/18/2023, 7:18 AM
This is where I am wrapping the function call: https://github.com/fmi-basel/gfriedri-em-alignment-flows/blob/94d8332470e248f54b7c20d21e4b151803f748bc/flows/sofima_tasks/sofima_tasks.py#L48 And here I call the task inside the flow: https://github.com/fmi-basel/gfriedri-em-alignment-flows/blob/94d8332470e248f54b7c20d21e4b151803f748bc/flows/tile_registration.py#L184 I am using
DaskTaskRunner
and
map
. This "fix" will only fix the issue I had with flows staying in a running state forever. It turns the flow into a failed state due to the raised
RunTimeException
. PS: Please don´t judge the code quality. This repo is still under heavy construction in at least 5 places.