https://prefect.io logo
Title
m

Marc Lipoff

02/17/2023, 6:32 PM
I'm getting an error in my flows, that I can't figure out.
ERROR   | Flow run 'jovial-okapi' - Crash detected! Execution was interrupted by an unexpected exception: RuntimeError: dictionary changed size during iteration
More of the stack trace:
18:31:11.374 | ERROR   | Flow run 'jovial-okapi' - Crash detected! Execution was interrupted by an unexpected exception: RuntimeError: dictionary changed size during iteration

18:31:11.425 | DEBUG   | httpx._client - HTTP Request: POST <http://orion:4200/api/flow_runs/71bc9fe6-4839-4458-ad85-c6386b2d0240/set_state> "HTTP/1.1 201 Created"
Traceback (most recent call last):
  File "/root/.prefect/flows/import_first_health/import_first_health.py", line 107, in <module>
    main()
  File "/usr/local/lib/python3.11/site-packages/prefect/flows.py", line 456, in __call__
    return enter_flow_run_engine_from_flow_call(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 170, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
           ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
           ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 243, in create_then_begin_flow_run
    state = await begin_flow_run(
            ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 347, in begin_flow_run
    async with AsyncExitStack() as stack:
  File "/usr/local/lib/python3.11/contextlib.py", line 730, in __aexit__
    raise exc_details[1]
  File "/usr/local/lib/python3.11/contextlib.py", line 222, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1674, in report_flow_run_crashes
    raise exc from None
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1647, in report_flow_run_crashes
    yield
  File "/usr/local/lib/python3.11/contextlib.py", line 713, in __aexit__
    cb_suppress = await cb(*exc_details)
                  ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1155, in create_task_run_then_submit
    task_run = await create_task_run(
               ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1190, in create_task_run
    task_inputs = {k: await collect_task_run_inputs(v) for k, v in parameters.items()}
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1190, in <dictcomp>
    task_inputs = {k: await collect_task_run_inputs(v) for k, v in parameters.items()}
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1054, in collect_task_run_inputs
    await run_sync_in_worker_thread(
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
           ^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/collections.py", line 334, in visit_collection
    items = [visit_nested(getattr(expr, key)) for key in model_fields]
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/collections.py", line 334, in <listcomp>
    items = [visit_nested(getattr(expr, key)) for key in model_fields]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/collections.py", line 264, in visit_nested
    return visit_collection(
           ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/collections.py", line 334, in visit_collection
    items = [visit_nested(getattr(expr, key)) for key in model_fields]
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/collections.py", line 334, in <listcomp>
    items = [visit_nested(getattr(expr, key)) for key in model_fields]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/collections.py", line 264, in visit_nested
    return visit_collection(
           ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/collections.py", line 318, in visit_collection
    items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/collections.py", line 318, in <listcomp>
    items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
                               ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/collections.py", line 264, in visit_nested
    return visit_collection(
           ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/collections.py", line 334, in visit_collection
    items = [visit_nested(getattr(expr, key)) for key in model_fields]
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/collections.py", line 334, in <listcomp>
    items = [visit_nested(getattr(expr, key)) for key in model_fields]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/collections.py", line 264, in visit_nested
    return visit_collection(
           ^^^^^^^^^^^^^^^^^
I was not getting it before, its something new as of today. Any ideas?
At times, the same flow in the same env will just hang
18:43:53.347 | DEBUG   | httpx._client - HTTP Request: POST <http://orion:4200/api/task_runs/5a88a407-69dd-4d19-a7c8-7558dbb94a87/set_state> "HTTP/1.1 201 Created"
18:43:53.348 | INFO    | Task run 'first_health_degrref-get-schema-0' - Finished in state Completed()
18:43:54.339 | DEBUG   | asyncio - Using selector: EpollSelector
18:43:54.375 | DEBUG   | httpx._client - HTTP Request: POST <http://orion:4200/api/logs/> "HTTP/1.1 201 Created"
18:43:56.378 | DEBUG   | asyncio - Using selector: EpollSelector
18:43:58.384 | DEBUG   | asyncio - Using selector: EpollSelector
18:44:00.389 | DEBUG   | asyncio - Using selector: EpollSelector
18:44:02.396 | DEBUG   | asyncio - Using selector: EpollSelector
18:44:04.400 | DEBUG   | asyncio - Using selector: EpollSelector
18:44:06.403 | DEBUG   | asyncio - Using selector: EpollSelector
18:44:08.411 | DEBUG   | asyncio - Using selector: EpollSelector
18:44:10.414 | DEBUG   | asyncio - Using selector: EpollSelector
18:44:12.419 | DEBUG   | asyncio - Using selector: EpollSelector
z

Zanie

02/21/2023, 7:43 PM
This is a pretty weird one
m

Marc Lipoff

02/21/2023, 7:43 PM
I was able to figure it out -- i was/am trying to pass the context around. it was causing lots of issues
z

Zanie

02/21/2023, 7:43 PM
What kind of data are you passing around?
Ah I see
m

Marc Lipoff

02/21/2023, 7:44 PM
i instead converted the context to dict, and went from there
z

Zanie

02/21/2023, 7:44 PM
Yeah I would strongly recommend not passing the flow run context around in entirety
There’s a lot of internal tracking of task run futures and such in there
e.g. it’s going to cause you problems if you pass the set of all task run futures into a task since we automatically resolve futures into data
m

Marc Lipoff

02/21/2023, 7:46 PM
got it!