Malthe Karbo
04/21/2022, 6:52 AMAnna Geller
PREFECT_LOGGING_LEVEL=WARNING python community_bug.py
@Marvin open "When returning large objects, send_logs raises an error `RuntimeError: Set changed size during iteration`"Marvin
04/21/2022, 11:30 AMMalthe Karbo
04/21/2022, 7:19 PMfrom prefect import flow, task
from prefect.task_runners import SequentialTaskRunner
@task
def slow_task_big_return() -> list[dict]:
return [{"x": i} for i in range(10000000)]
@task
def fast_task_uses_input(input: list[dict]):
return True
@flow(task_runner=SequentialTaskRunner())
def failing_flow():
results = slow_task_big_return()
fast_task_uses_input(results)
failing_flow()
will cause
08:45:56.915 | INFO | prefect.engine - Created flow run 'miniature-muskox' for flow 'failing-flow'
08:45:56.916 | INFO | Flow run 'miniature-muskox' - Using task runner 'SequentialTaskRunner'
08:45:57.069 | WARNING | Flow run 'miniature-muskox' - No default storage is configured on the server. Results from this flow run will be stored in a temporary directory in its runtime environment.
08:45:57.488 | INFO | Flow run 'miniature-muskox' - Created task run 'slow_task_big_return-36221a7f-0' for task 'slow_task_big_return'
Exception in thread orion-log-worker:
Traceback (most recent call last):
File "/usr/lib/python3.9/threading.py", line 954, in _bootstrap_inner
self.run()
File "/usr/lib/python3.9/threading.py", line 892, in run
self._target(*self._args, **self._kwargs)
File "/path/.venv/lib/python3.9/site-packages/prefect/logging/handlers.py", line 82, in _send_logs_loop
anyio.run(self.send_logs)
File "/path/.venv/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 56, in run
return asynclib.run(func, *args, **backend_options)
File "/path/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 233, in run
return native_run(wrapper(), debug=debug)
File "/usr/lib/python3.9/asyncio/runners.py", line 47, in run
_cancel_all_tasks(loop)
File "/usr/lib/python3.9/asyncio/runners.py", line 56, in _cancel_all_tasks
to_cancel = tasks.all_tasks(loop)
File "/usr/lib/python3.9/asyncio/tasks.py", line 53, in all_tasks
tasks = list(_all_tasks)
File "/usr/lib/python3.9/_weakrefset.py", line 61, in __iter__
for itemref in self.data:
RuntimeError: Set changed size during iteration
Moreover, it will hang in process forever, and if you are on orion cloud it will create a never-ending flow run.Anna Geller
Malthe Karbo
04/21/2022, 8:27 PM