I’d like to follow a workflow with Prefect 2.0 of ...
# prefect-community
s
I’d like to follow a workflow with Prefect 2.0 of separate
@task
functions called from a
@flow
- all of our work is done outside of Python called via make commands, which I currently use
shell_run_command
for. However, you can’t put
shell_run_command
within a function decorated with
@task
. I could just remove the
@task
decorator. Is there a workflow folks use to leverage
@task
decorator that run shell commands?
1
k
Right, since
shell_run_command
itself is a task, it can't be called by another task. If it makes sense for some of your flow logic to be in its own function, like you said, you could do flow -> regular python function -> task
shell_run_command
. The code in your python functions won't belong to any tasks, though, just the flow. Another thing you could do is copy the
shell_run_command
code into its own function in your project, without the task decorator, then call it from your flow's tasks, like flow -> task -> regular python function
shell_run_command
🙏 1
s
Thanks. The 2nd thing you suggest is basically a version of
shell_run_command
that isn’t also a task? That would be ideal. However, it’d be nice if the
prefect_shell
library had the function if possible. Maybe there’s not enough interest though
p
Hey Scott - do you have an example of a function that you would like to run the shell_run_command task inside of? Also, depending on a few factors, you could use subflows instead. I do think it sounds like the easiest is option is for you to copy our code 1:1 into a utility function and call it from within your custom tasks
s
By “copy our code”, do you mean the code underlying
shell_run_command
?
An example:
Copy code
def one_run(command, name, **kwargs):
    return shell_run_command.with_options(name = name)(
        command = command,
        cwd = "/some/path",
        return_all = True,
        **kwargs)

def two_run(command, name, **kwargs):
    return shell_run_command.with_options(name = name)(
        command = command,
        cwd = "/some/other/path",
        return_all = True,
        **kwargs)

@flow
def person():
    x = one_run(
        command = "make something",
        name = "a name"
    )
    y = two_run(
        command = "make something else",
        name = "another name"
    )
    z = one_run(
        command = "make another thing",
        name = "a cool name"
    )

    return z
I have used subflows in some, that’s a good idea.
p
Hmmm - why are you making those their own functions when you're just calling the
shell_run_command
directly?
s
b/c there’s many tasks that are run using the same
cwd
command, also b/c it’s less code - not repeating
shell_run_command
and all its inputs every time within
@flow
p
brb in a meeting
But yeah, if you literally just copy all of the code in that file, then remove the
@task
decorator from the copied code, you can make it a utility function and toss it in your flows. Here's a really basic example: Here's the code you need to copy: https://github.com/PrefectHQ/prefect-shell/blob/main/prefect_shell/commands.py
Copy code
import logging
import os
import sys
import tempfile
from typing import List, Optional, Union

from anyio import open_process
from anyio.streams.text import TextReceiveStream
from prefect import task, flow
from prefect.logging import get_run_logger


async def shell_run_command(
    command: str,
    env: Optional[dict] = None,
    helper_command: Optional[str] = None,
    shell: Optional[str] = None,
    extension: Optional[str] = None,
    return_all: bool = False,
    stream_level: int = <http://logging.INFO|logging.INFO>,
    cwd: Union[str, bytes, os.PathLike, None] = None,
) -> Union[List, str]:
    logger = get_run_logger()

    current_env = os.environ.copy()
    current_env.update(env or {})

    if shell is None:
        # if shell is not specified:
        # use powershell for windows
        # use bash for other platforms
        shell = "powershell" if sys.platform == "win32" else "bash"

    extension = ".ps1" if shell.lower() == "powershell" else extension

    tmp = tempfile.NamedTemporaryFile(prefix="prefect-", suffix=extension, delete=False)
    try:
        if helper_command:
            tmp.write(helper_command.encode())
            tmp.write(os.linesep.encode())
        tmp.write(command.encode())
        tmp.close()

        shell_command = [shell, tmp.name]

        lines = []
        async with await open_process(
            shell_command, env=current_env, cwd=cwd
        ) as process:
            async for text in TextReceiveStream(process.stdout):
                logger.log(level=stream_level, msg=text)
                lines.extend(text.rstrip().split("\n"))

            await process.wait()
            if process.returncode:
                stderr = "\n".join(
                    [text async for text in TextReceiveStream(process.stderr)]
                )
                if not stderr and lines:
                    stderr = f"{lines[-1]}\n"
                msg = (
                    f"Command failed with exit code {process.returncode}:\n" f"{stderr}"
                )
                raise RuntimeError(msg)
    finally:
        if os.path.exists(tmp.name):
            os.remove(tmp.name)

    line = lines[-1] if lines else ""
    return lines if return_all else line


@task(name="demo_name")
async def shell_task(word: str):
    return await shell_run_command(
        command=f"echo {word}",
        return_all=True,
    )

@flow
def shell_flow():
    x = shell_task("Woof!")
    y = shell_task("Meow")
    print(x, y)

if __name__ == "__main__":
    shell_flow()
🙌 1
s
nice!
thanks for your help
👍 1
r
To follow up here, instead of copying the function into one's repo and maintaining it over time, one could use
.fn()
on
shell_run_command
to access the underlying function of the task!
👍 1