I'm using an async producer/consumer model with an...
# ask-community
s
I'm using an async producer/consumer model with anyio memory channels. When running under prefect, the producer is in the flow and feeds items to a channel that is consumed, processed, and then passed through a couple more channels and finally written out. When I run the same code without flow/task annotations (so not prefect), the code runs as expected. However, under prefect, I'm receiving a "random" but reproducible segmentation fault. Any thoughts? Stack trace in thread.
Copy code
Thread 0x00000001757e3000 (most recent call first):
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/concurrent/futures/thread.py", line 81 in _worker
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/threading.py", line 975 in run
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/threading.py", line 1038 in _bootstrap_inner
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/threading.py", line 995 in _bootstrap

Current thread 0x00000001747d7000 (most recent call first):
  File "/Users/seandavis/Documents/git/infra/prefect-infra/geo_flow.py", line 125 in write_geo_entity_worker
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/asyncio/events.py", line 80 in _run
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/asyncio/base_events.py", line 1922 in _run_once
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/asyncio/base_events.py", line 607 in run_forever
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/asyncio/base_events.py", line 640 in run_until_complete
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/asyncio/runners.py", line 118 in run
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/asyncio/runners.py", line 190 in run
  File "/Users/seandavis/Documents/git/infra/prefect-infra/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 274 in run
  File "/Users/seandavis/Documents/git/infra/prefect-infra/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/threads.py", line 105 in _run_until_shutdown
  File "/Users/seandavis/Documents/git/infra/prefect-infra/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/threads.py", line 90 in _entrypoint
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/threading.py", line 975 in run
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/threading.py", line 1038 in _bootstrap_inner
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/threading.py", line 995 in _bootstrap

Thread 0x00000001737cb000 (most recent call first):
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/threading.py", line 320 in wait
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/queue.py", line 171 in get
  File "/Users/seandavis/Documents/git/infra/prefect-infra/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318 in _run_sync
  File "/Users/seandavis/Documents/git/infra/prefect-infra/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 248 in run
  File "/Users/seandavis/Documents/git/infra/prefect-infra/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/threads.py", line 105 in _run_until_shutdown
  File "/Users/seandavis/Documents/git/infra/prefect-infra/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/threads.py", line 90 in _entrypoint
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/threading.py", line 975 in run
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/threading.py", line 1038 in _bootstrap_inner
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/threading.py", line 995 in _bootstrap

Thread 0x00000001727bf000 (most recent call first):
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/threading.py", line 324 in wait
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/queue.py", line 180 in get
  File "/Users/seandavis/Documents/git/infra/prefect-infra/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318 in _run_sync
  File "/Users/seandavis/Documents/git/infra/prefect-infra/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 248 in run
  File "/Users/seandavis/Documents/git/infra/prefect-infra/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/threads.py", line 105 in _run_until_shutdown
  File "/Users/seandavis/Documents/git/infra/prefect-infra/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/threads.py", line 90 in _entrypoint
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/threading.py", line 975 in run
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/threading.py", line 1038 in _bootstrap_inner
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/threading.py", line 995 in _bootstrap

Thread 0x00000001717b3000 (most recent call first):
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/selectors.py", line 561 in select
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/asyncio/base_events.py", line 1884 in _run_once
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/asyncio/base_events.py", line 607 in run_forever
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/asyncio/base_events.py", line 640 in run_until_complete
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/asyncio/runners.py", line 118 in run
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/asyncio/runners.py", line 190 in run
  File "/Users/seandavis/Documents/git/infra/prefect-infra/venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 204 in run
  File "/Users/seandavis/Documents/git/infra/prefect-infra/venv/lib/python3.11/site-packages/anyio/_core/_eventloop.py", line 68 in run
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/concurrent/futures/thread.py", line 58 in run
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/concurrent/futures/thread.py", line 83 in _worker
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/threading.py", line 975 in run
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/threading.py", line 1038 in _bootstrap_inner
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/threading.py", line 995 in _bootstrap

Thread 0x000000017022f000 (most recent call first):
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/selectors.py", line 561 in select
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/asyncio/base_events.py", line 1884 in _run_once
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/asyncio/base_events.py", line 607 in run_forever
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/asyncio/base_events.py", line 640 in run_until_complete
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/asyncio/runners.py", line 118 in run
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/asyncio/runners.py", line 190 in run
  File "/Users/seandavis/Documents/git/infra/prefect-infra/venv/lib/python3.11/site-packages/prefect/_internal/concurrency/threads.py", line 198 in _entrypoint
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/threading.py", line 975 in run
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/threading.py", line 1038 in _bootstrap_inner
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/threading.py", line 995 in _bootstrap

Thread 0x00000001e2df9000 (most recent call first):
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/selectors.py", line 561 in select
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/asyncio/base_events.py", line 1884 in _run_once
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/asyncio/base_events.py", line 607 in run_forever
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/asyncio/base_events.py", line 640 in run_until_complete
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/asyncio/runners.py", line 118 in run
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/asyncio/runners.py", line 190 in run
  File "/Users/seandavis/Documents/git/infra/prefect-infra/venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 204 in run
  File "/Users/seandavis/Documents/git/infra/prefect-infra/venv/lib/python3.11/site-packages/anyio/_core/_eventloop.py", line 68 in run
  File "/Users/seandavis/Documents/git/infra/prefect-infra/geo_flow.py", line 271 in <module>

Extension modules: pendulum.parsing._iso8601, pendulum._extensions._helpers, pydantic.typing, pydantic.errors, pydantic.version, pydantic.utils, pydantic.class_validators, pydantic.config, pydantic.color, pydantic.datetime_parse, pydantic.validators, pydantic.networks, pydantic.types, pydantic.json, pydantic.error_wrappers, pydantic.fields, pydantic.parse, pydantic.schema, pydantic.main, pydantic.dataclasses, pydantic.annotated_types, pydantic.decorator, pydantic.env_settings, pydantic.tools, pydantic, websockets.speedups, yaml._yaml, ruamel.yaml.clib._ruamel_yaml, _ruamel_yaml, ujson, simplejson._speedups, charset_normalizer.md, tornado.speedups, numpy.core._multiarray_umath, numpy.core._multiarray_tests, numpy.linalg._umath_linalg, numpy.fft._pocketfft_internal, numpy.random._common, numpy.random.bit_generator, numpy.random._bounded_integers, numpy.random._mt19937, numpy.random.mtrand, numpy.random._philox, numpy.random._pcg64, numpy.random._sfc64, numpy.random._generator, sqlalchemy.cimmutabledict, greenlet._greenlet, sqlalchemy.cprocessors, sqlalchemy.cresultproxy, _cffi_backend, markupsafe._speedups, asyncpg.pgproto.pgproto, asyncpg.protocol.protocol (total: 54)
d
Interesting. Are you able to tell how many event loops are active and which the flow is running on vs your channels?
s
I can try to figure that out with some more debugging.
My intention was to be running on the same event loop as the flow.
I'm running the flow using:
Copy code
if __name__ == "__main__":
    anyio.run(main_flow, datetime.date(2000, 1, 1), datetime.date(2007, 1, 1))
d
I only ask because I had issues with asynchronous Prefect long ago where there's a conflict with an awaitable. I'm struggling to remember the details, but I believe it was a semaphore being passed to a Prefect Task that was on a separate event loop.
The fact that there are multiple threads within your flow run makes me think you have multiple event loops running
s
Good point on the thread ids.
n
hey @Sean Davis - whats
prefect version
for you? im curious if this is pre/post us moving (non-`.submit`ed) tasks to the main thread
I'd also be curious for an MRE if you have one to share
s
Version 2.14.16.
thank you 1
n
If you could share an MRE i'd be happy to take pyspy for a spin on your example
๐Ÿ‘ 1
p
There are some changes to use a single event loop here: https://github.com/PrefectHQ/prefect/pull/11930 it is not released yet.
n
yep thatโ€™s what i was alluding to above, but note that after this change we only run tasks on the main thread if you do not call
.submit
on them which is why i was curious to see the code, since
__call__
vs
.submit
would make a difference (if on the newest unreleased version)
๐Ÿš€ 1
s
Thanks, @nate and @Peter Bygrave. The segfault is reproducible but random. Let me do a little more local work to see if I can produce a better MRE and try to isolate things a bit further. I'd note that, in my case, the workflow is a single task that is using anyio memory object streams for concurrency (with a producer, processer, and consumer/writer).
๐Ÿ‘ 1
p
we only run tasks on the main thread if you do not call
.submit
on them
Ah cool, thanks for that info
2.15.0 is now out with the fix above (not entirely sure if it will help or not thought ๐Ÿ™‚)
s
Thanks, @Peter Bygrave. I'm standing up the new version today.
c
@adam aditama