Wade Glaser
12/21/2023, 10:12 PM@flow
async def flow_function():
await my_function()
time.sleep(10) #to ensure the CLI commands in my_function finish
await my_tasks()
This pared down version of my_function which uses ShellOperation (Edited for privacy, but I assure you the actual command line command I use works!)
from prefect_shell import ShellOperation
from prefect_shell.commands import ShellProcess
import anyio
import asyncio
import subprocess
import time
async def my_function():
process: ShellProcess = await ShellOperation(
commands=[
f"<command that prompts for a password on the command line using stdin>"
],
stream_output=True,
).trigger()
time.sleep(5) #make sure the password prompt shows up.
await process._process.stdin.send(bytes("<password>\n", encoding="UTF-8")
)
Nate
12/22/2023, 12:27 AMWade Glaser
12/22/2023, 2:28 PMNate
12/22/2023, 2:54 PMMike Watson
12/22/2023, 3:55 PMNate
12/22/2023, 3:59 PMMike Watson
12/22/2023, 4:28 PM