Hi there. I’m having some trouble getting paramete...
# prefect-community
k
Hi there. I’m having some trouble getting parameters to work on my flow comprised of ShellTasks. Is there anyway to get a parameter into a ShellTask? This doesn’t seem to work:
Copy code
with Flow("toy_flow") as flow:
    a = Parameter("a")
    b = Parameter("b")
    c = Parameter("c")

    job1_task = ShellTask(name="job1", command=f"""echo {a} {b} {c}""")
j
Hi Kevin, the trick here is that the
f
string you're setting to
command
runs at flow build time (where
a
,
b
,
c
are
Parameter
objects), but you want their actual values at runtime. One way to accomplish this would be to not pass
command
to the
ShellTask
constructor, and instead pass it to the task as a runtime parameter. Something like:
Copy code
from prefect.tasks.shell import ShellTask
from prefect import Flow, task, Parameter

@task
def build_command(a, b, c):
    return f"echo {a}, {b}, {c}"

shell = ShellTask()

with Flow("test") as flow:
    a = Parameter("a")
    b = Parameter("b")
    c = Parameter("c")

    command = build_command(a, b, c)
    output = shell(command=command)

state = flow.run(a=1, b=2, c=3)
print(state.result[output].result)
Copy code
[2020-06-09 17:59:05] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2020-06-09 17:59:05] INFO - prefect.FlowRunner | Starting flow run.
[2020-06-09 17:59:05] INFO - prefect.TaskRunner | Task 'c': Starting task run...
[2020-06-09 17:59:05] INFO - prefect.TaskRunner | Task 'c': finished task run for task with final state: 'Success'
[2020-06-09 17:59:05] INFO - prefect.TaskRunner | Task 'a': Starting task run...
[2020-06-09 17:59:05] INFO - prefect.TaskRunner | Task 'a': finished task run for task with final state: 'Success'
[2020-06-09 17:59:05] INFO - prefect.TaskRunner | Task 'b': Starting task run...
[2020-06-09 17:59:05] INFO - prefect.TaskRunner | Task 'b': finished task run for task with final state: 'Success'
[2020-06-09 17:59:05] INFO - prefect.TaskRunner | Task 'build_command': Starting task run...
[2020-06-09 17:59:05] INFO - prefect.TaskRunner | Task 'build_command': finished task run for task with final state: 'Success'
[2020-06-09 17:59:05] INFO - prefect.TaskRunner | Task 'ShellTask': Starting task run...
[2020-06-09 17:59:05] INFO - prefect.TaskRunner | Task 'ShellTask': finished task run for task with final state: 'Success'
[2020-06-09 17:59:05] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
1, 2, 3
k
Thanks so much @Jim Crist-Harif - this is what I had originally, but it creates an additional task in the DAG. The additional clutter can make debugging difficult. Would it make sense to subclass ShellTask and accept arguments as kwargs in the constructor?
j
Sure. The builtin task library is there as useful tools to get started, but there's no mandate to use it as is. User-defined tasks are first-class in prefect, so if defining your own task feels easier then that's a valid solution.
this is what I had originally, but it creates an additional task in the DAG. The additional clutter can make debugging difficult.
Can you elaborate on this? It'd be useful to get some insight from a usability perspective.
k
I have a more complex example I can share when I’m at my computer, but I have 3 shelltasks that each take parameters. The 3 shelltasks run in series. The above approach gives and additional task for each shelltask in the graph - I was trying to collapse those 2 tasks down to one, because extracting parameters is not something I care to see in the dag.
j
So the issue here is more the web-ui display of the graph? You care about the shell tasks, having ancillary tasks for setting these up is distracting in the UI?
If your flow did still have those tasks, but you didn't see them in the display (or at least not by default), would that resolve your issue? And if so, how would you think about the ancillary tasks? Is the setup + shell task one larger task conceptually to you?
Sorry for the vague questions, thinking about how we can make the experience nicer.
k
here’s my more involved example:
Copy code
from prefect import task, Flow
from prefect.tasks.shell import ShellTask
from prefect import Parameter


@task
def build_job1_cmd(a, b, c):
    return f"""
        echo {a}, {b}, {c}
    """


@task
def build_job2_cmd(b):
    return f"""
        echo {b}
    """


@task
def build_job3_cmd():
    return f"""
        echo NULL
    """


with Flow("toy_flow") as flow:
    # parameters
    a = Parameter("a")
    b = Parameter("b")
    c = Parameter("c")

    # tasks
    job1_cmd = build_job1_cmd(a, b, c)
    job1_task = ShellTask(name="job1")(command=job1_cmd)

    job2_cmd = build_job2_cmd(b)
    job2_task = ShellTask(name="job2")(command=job2_cmd)

    job3_cmd = build_job3_cmd()
    job3_task = ShellTask(name="job3")(command=job3_cmd)

    # dependencies
    job2_task.set_upstream(job1_task)
    job3_task.set_upstream(job2_task)

flow.register()
and here is the resulting DAG graph:
So the issue here is more the web-ui display of the graph? You care about the shell tasks, having ancillary tasks for setting these up is distracting in the UI?
That’s correct. When you have a big data pipeline going and you need to debug a particular step quickly - you don’t want two nodes (tasks) that essentially represent a single step
If your flow did still have those tasks, but you didn’t see them in the display (or at least not by default), would that resolve your issue?
it would, but I think that might be confusing. It might be useful to have an
@shelltask
decorator, or some other way for
ShellTask
to accept a
Paramater
Is the setup + shell task one larger task conceptually to you?
Yes, it is
thanks so much for looking!