Kevin Weiler
06/09/2020, 5:38 PMwith Flow("toy_flow") as flow:
a = Parameter("a")
b = Parameter("b")
c = Parameter("c")
job1_task = ShellTask(name="job1", command=f"""echo {a} {b} {c}""")
Jim Crist-Harif
06/09/2020, 5:58 PMf
string you're setting to command
runs at flow build time (where a
, b
, c
are Parameter
objects), but you want their actual values at runtime.
One way to accomplish this would be to not pass command
to the ShellTask
constructor, and instead pass it to the task as a runtime parameter. Something like:
from prefect.tasks.shell import ShellTask
from prefect import Flow, task, Parameter
@task
def build_command(a, b, c):
return f"echo {a}, {b}, {c}"
shell = ShellTask()
with Flow("test") as flow:
a = Parameter("a")
b = Parameter("b")
c = Parameter("c")
command = build_command(a, b, c)
output = shell(command=command)
state = flow.run(a=1, b=2, c=3)
print(state.result[output].result)
[2020-06-09 17:59:05] INFO - prefect.FlowRunner | Beginning Flow run for 'test'
[2020-06-09 17:59:05] INFO - prefect.FlowRunner | Starting flow run.
[2020-06-09 17:59:05] INFO - prefect.TaskRunner | Task 'c': Starting task run...
[2020-06-09 17:59:05] INFO - prefect.TaskRunner | Task 'c': finished task run for task with final state: 'Success'
[2020-06-09 17:59:05] INFO - prefect.TaskRunner | Task 'a': Starting task run...
[2020-06-09 17:59:05] INFO - prefect.TaskRunner | Task 'a': finished task run for task with final state: 'Success'
[2020-06-09 17:59:05] INFO - prefect.TaskRunner | Task 'b': Starting task run...
[2020-06-09 17:59:05] INFO - prefect.TaskRunner | Task 'b': finished task run for task with final state: 'Success'
[2020-06-09 17:59:05] INFO - prefect.TaskRunner | Task 'build_command': Starting task run...
[2020-06-09 17:59:05] INFO - prefect.TaskRunner | Task 'build_command': finished task run for task with final state: 'Success'
[2020-06-09 17:59:05] INFO - prefect.TaskRunner | Task 'ShellTask': Starting task run...
[2020-06-09 17:59:05] INFO - prefect.TaskRunner | Task 'ShellTask': finished task run for task with final state: 'Success'
[2020-06-09 17:59:05] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
1, 2, 3
Kevin Weiler
06/09/2020, 7:09 PMJim Crist-Harif
06/09/2020, 7:11 PMthis is what I had originally, but it creates an additional task in the DAG. The additional clutter can make debugging difficult.Can you elaborate on this? It'd be useful to get some insight from a usability perspective.
Kevin Weiler
06/09/2020, 7:20 PMJim Crist-Harif
06/09/2020, 7:22 PMKevin Weiler
06/09/2020, 9:55 PMfrom prefect import task, Flow
from prefect.tasks.shell import ShellTask
from prefect import Parameter
@task
def build_job1_cmd(a, b, c):
return f"""
echo {a}, {b}, {c}
"""
@task
def build_job2_cmd(b):
return f"""
echo {b}
"""
@task
def build_job3_cmd():
return f"""
echo NULL
"""
with Flow("toy_flow") as flow:
# parameters
a = Parameter("a")
b = Parameter("b")
c = Parameter("c")
# tasks
job1_cmd = build_job1_cmd(a, b, c)
job1_task = ShellTask(name="job1")(command=job1_cmd)
job2_cmd = build_job2_cmd(b)
job2_task = ShellTask(name="job2")(command=job2_cmd)
job3_cmd = build_job3_cmd()
job3_task = ShellTask(name="job3")(command=job3_cmd)
# dependencies
job2_task.set_upstream(job1_task)
job3_task.set_upstream(job2_task)
flow.register()
So the issue here is more the web-ui display of the graph? You care about the shell tasks, having ancillary tasks for setting these up is distracting in the UI?That’s correct. When you have a big data pipeline going and you need to debug a particular step quickly - you don’t want two nodes (tasks) that essentially represent a single step
If your flow did still have those tasks, but you didn’t see them in the display (or at least not by default), would that resolve your issue?it would, but I think that might be confusing. It might be useful to have an
@shelltask
decorator, or some other way for ShellTask
to accept a Paramater
Is the setup + shell task one larger task conceptually to you?Yes, it is