Santiago Gonzalez
05/18/2023, 1:36 PM@flow
def demo_flow():
with ShellOperation(commands=['sleep 1m', 'echo "hello world"']) as shell_task:
process = shell_task.trigger()
process.wait_for_completion()
output = process.fetch_result()
get_run_logger().info(f'Output: "{output}"')
if __name__ == '__main__':
demo_flow()
But it is failing, with this error:
10:31:35.447 | ERROR | asyncio - Exception in callback SubprocessStreamProtocol.pipe_data_received(1, b'hello world\n')
handle: <Handle SubprocessStreamProtocol.pipe_data_received(1, b'hello world\n')>
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/events.py", line 81, in _run
self._context.run(self._callback, *self._args)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/subprocess.py", line 73, in pipe_data_received
reader.feed_data(data)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/streams.py", line 472, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
prefect: 2.10.9
prefect-shell: 0.1.5
python: 3.8.10Austin Weisgrau
05/18/2023, 3:58 PMSantiago Gonzalez
05/18/2023, 7:20 PMshell_run_command
taskPedro Machado
05/18/2023, 8:24 PMDbtCoreOperation
I am able to run it with the run()
method but the docs recommend using the context manager with trigger()
for long-running tasks. It looks like a bug to me but can't tell for sure.