scott
12/12/2022, 4:20 PM@task
functions called from a @flow
- all of our work is done outside of Python called via make commands, which I currently use shell_run_command
for. However, you can’t put shell_run_command
within a function decorated with @task
. I could just remove the @task
decorator. Is there a workflow folks use to leverage @task
decorator that run shell commands?Kevin Grismore
12/12/2022, 4:30 PMshell_run_command
itself is a task, it can't be called by another task. If it makes sense for some of your flow logic to be in its own function, like you said, you could do flow -> regular python function -> task shell_run_command
. The code in your python functions won't belong to any tasks, though, just the flow.
Another thing you could do is copy the shell_run_command
code into its own function in your project, without the task decorator, then call it from your flow's tasks, like flow -> task -> regular python function shell_run_command
scott
12/12/2022, 4:52 PMshell_run_command
that isn’t also a task? That would be ideal. However, it’d be nice if the prefect_shell
library had the function if possible. Maybe there’s not enough interest thoughPeyton Runyan
12/12/2022, 6:01 PMscott
12/12/2022, 6:03 PMshell_run_command
?scott
12/12/2022, 6:10 PMdef one_run(command, name, **kwargs):
return shell_run_command.with_options(name = name)(
command = command,
cwd = "/some/path",
return_all = True,
**kwargs)
def two_run(command, name, **kwargs):
return shell_run_command.with_options(name = name)(
command = command,
cwd = "/some/other/path",
return_all = True,
**kwargs)
@flow
def person():
x = one_run(
command = "make something",
name = "a name"
)
y = two_run(
command = "make something else",
name = "another name"
)
z = one_run(
command = "make another thing",
name = "a cool name"
)
return z
scott
12/12/2022, 6:11 PMPeyton Runyan
12/12/2022, 6:40 PMshell_run_command
directly?scott
12/12/2022, 6:44 PMcwd
command, also b/c it’s less code - not repeating shell_run_command
and all its inputs every time within @flow
Peyton Runyan
12/12/2022, 6:45 PMPeyton Runyan
12/12/2022, 7:21 PM@task
decorator from the copied code, you can make it a utility function and toss it in your flows. Here's a really basic example:
Here's the code you need to copy: https://github.com/PrefectHQ/prefect-shell/blob/main/prefect_shell/commands.py
import logging
import os
import sys
import tempfile
from typing import List, Optional, Union
from anyio import open_process
from anyio.streams.text import TextReceiveStream
from prefect import task, flow
from prefect.logging import get_run_logger
async def shell_run_command(
command: str,
env: Optional[dict] = None,
helper_command: Optional[str] = None,
shell: Optional[str] = None,
extension: Optional[str] = None,
return_all: bool = False,
stream_level: int = <http://logging.INFO|logging.INFO>,
cwd: Union[str, bytes, os.PathLike, None] = None,
) -> Union[List, str]:
logger = get_run_logger()
current_env = os.environ.copy()
current_env.update(env or {})
if shell is None:
# if shell is not specified:
# use powershell for windows
# use bash for other platforms
shell = "powershell" if sys.platform == "win32" else "bash"
extension = ".ps1" if shell.lower() == "powershell" else extension
tmp = tempfile.NamedTemporaryFile(prefix="prefect-", suffix=extension, delete=False)
try:
if helper_command:
tmp.write(helper_command.encode())
tmp.write(os.linesep.encode())
tmp.write(command.encode())
tmp.close()
shell_command = [shell, tmp.name]
lines = []
async with await open_process(
shell_command, env=current_env, cwd=cwd
) as process:
async for text in TextReceiveStream(process.stdout):
logger.log(level=stream_level, msg=text)
lines.extend(text.rstrip().split("\n"))
await process.wait()
if process.returncode:
stderr = "\n".join(
[text async for text in TextReceiveStream(process.stderr)]
)
if not stderr and lines:
stderr = f"{lines[-1]}\n"
msg = (
f"Command failed with exit code {process.returncode}:\n" f"{stderr}"
)
raise RuntimeError(msg)
finally:
if os.path.exists(tmp.name):
os.remove(tmp.name)
line = lines[-1] if lines else ""
return lines if return_all else line
@task(name="demo_name")
async def shell_task(word: str):
return await shell_run_command(
command=f"echo {word}",
return_all=True,
)
@flow
def shell_flow():
x = shell_task("Woof!")
y = shell_task("Meow")
print(x, y)
if __name__ == "__main__":
shell_flow()
Peyton Runyan
12/12/2022, 7:23 PMscott
12/12/2022, 7:33 PMscott
12/12/2022, 7:42 PMRam Vuppaladadiyam
01/17/2023, 10:43 PM.fn()
on shell_run_command
to access the underlying function of the task!