Hi, when you have a task (ORION) which returns a l...
# prefect-community
m
Hi, when you have a task (ORION) which returns a large output (or is slow to return), it will cause a hiccup with prefect log handlers (see chat for details) I am using latest beta release 2.03b
a
could you move the code blocks to the thread to keep the main channel cleaner?
Thanks for reporting that. I was able to reproduce the error. With smaller numbers up to 100,000, it was fine but going to a million already got my "intelligent-cow" stuck 🙂 I checked that already the first task returning a large number causes the issue so it doesn't seem to be related to passing data to downstream tasks. I checked that the issue is the same regardless of which task runner you assign (as expected). I was also able to confirm that the issue here is with logs - running the flow with WARNING logs only worked (you may use it as a temporary workaround):
Copy code
PREFECT_LOGGING_LEVEL=WARNING python community_bug.py
@Marvin open "When returning large objects, send_logs raises an error `RuntimeError: Set changed size during iteration`"
🐮 1
m
Yes! Using the sample below
Copy code
from prefect import flow, task
from prefect.task_runners import SequentialTaskRunner

@task
def slow_task_big_return() -> list[dict]:
    return [{"x": i} for i in range(10000000)]



@task
def fast_task_uses_input(input: list[dict]):
    return True


@flow(task_runner=SequentialTaskRunner())
def failing_flow():
    results = slow_task_big_return()
    fast_task_uses_input(results)

failing_flow()
will cause
Copy code
08:45:56.915 | INFO    | prefect.engine - Created flow run 'miniature-muskox' for flow 'failing-flow'
08:45:56.916 | INFO    | Flow run 'miniature-muskox' - Using task runner 'SequentialTaskRunner'
08:45:57.069 | WARNING | Flow run 'miniature-muskox' - No default storage is configured on the server. Results from this flow run will be stored in a temporary directory in its runtime environment.
08:45:57.488 | INFO    | Flow run 'miniature-muskox' - Created task run 'slow_task_big_return-36221a7f-0' for task 'slow_task_big_return'
Exception in thread orion-log-worker:
Traceback (most recent call last):
  File "/usr/lib/python3.9/threading.py", line 954, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.9/threading.py", line 892, in run
    self._target(*self._args, **self._kwargs)
  File "/path/.venv/lib/python3.9/site-packages/prefect/logging/handlers.py", line 82, in _send_logs_loop
    anyio.run(self.send_logs)
  File "/path/.venv/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 56, in run
    return asynclib.run(func, *args, **backend_options)
  File "/path/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 233, in run
    return native_run(wrapper(), debug=debug)
  File "/usr/lib/python3.9/asyncio/runners.py", line 47, in run
    _cancel_all_tasks(loop)
  File "/usr/lib/python3.9/asyncio/runners.py", line 56, in _cancel_all_tasks
    to_cancel = tasks.all_tasks(loop)
  File "/usr/lib/python3.9/asyncio/tasks.py", line 53, in all_tasks
    tasks = list(_all_tasks)
  File "/usr/lib/python3.9/_weakrefset.py", line 61, in __iter__
    for itemref in self.data:
RuntimeError: Set changed size during iteration
Moreover, it will hang in process forever, and if you are on orion cloud it will create a never-ending flow run.
a
thanks for moving the code block 🙏
1
m
And thank you for the fix suggestion, will check try it out tomorrow!
👍 1