https://prefect.io logo
Title
s

Santiago Gonzalez

05/18/2023, 1:36 PM
Hi I’ve been testing some shellOperation task
@flow
def demo_flow():
    with ShellOperation(commands=['sleep 1m', 'echo "hello world"']) as shell_task:
        process = shell_task.trigger()
        process.wait_for_completion()
        output = process.fetch_result()
        get_run_logger().info(f'Output: "{output}"')


if __name__ == '__main__':
    demo_flow()
But it is failing, with this error:
10:31:35.447 | ERROR   | asyncio - Exception in callback SubprocessStreamProtocol.pipe_data_received(1, b'hello world\n')
handle: <Handle SubprocessStreamProtocol.pipe_data_received(1, b'hello world\n')>
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/subprocess.py", line 73, in pipe_data_received
    reader.feed_data(data)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/streams.py", line 472, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
prefect: 2.10.9 prefect-shell: 0.1.5 python: 3.8.10
a

Austin Weisgrau

05/18/2023, 3:58 PM
FWIW I've also had trouble getting prefect shell to work at a minimal level and have fallen back to using python subprocess directly
s

Santiago Gonzalez

05/18/2023, 7:20 PM
I ’ve found a solution anyway, using
shell_run_command
task
p

Pedro Machado

05/18/2023, 8:24 PM
I am having the same issue with the
DbtCoreOperation
I am able to run it with the
run()
method but the docs recommend using the context manager with
trigger()
for long-running tasks. It looks like a bug to me but can't tell for sure.