Jay Sundaram
04/08/2021, 3:08 PM--destination_bucket_name <Parameter: destination_bucket_name>
when was expecting:
--destination_bucket_name myorg-S3-bucket
Dylan
Dylan
Dylan
Parameter
as an argument to a Task, that parameter isn’t populated until runtime, eitherDylan
Dylan
Dylan
flow.run()
locally or an agent starts a flow run, parameters are defined and the code inside your tasks is executedDylan
Jay Sundaram
04/08/2021, 3:17 PMdef _execute_cmd(cmd):
p = subprocess.Popen(cmd, shell=True)
...
@task()
def convert(destination_bucket_name):
cmd = f"myexecutable --destination_bucket_name {destination_bucket_name}"
_execute_cmd(cmd)
with Flow('myflow') as flow:
destination_bucket_name = Parameter("destination_bucket_name", DEFAULT_DESTINATION_BUCKET_NAME)
convert(destination_bucket_name)
Something like this.Kevin Kho
ShellTask
to run that subprocess call: https://docs.prefect.io/api/latest/tasks/shell.htmlDylan
Kevin Kho
Jay Sundaram
04/08/2021, 3:26 PMDylan
Dylan
Dylan
Dylan
Jay Sundaram
04/08/2021, 3:28 PMDylan
set_upstream
Dylan
Dylan
Jay Sundaram
04/08/2021, 3:32 PM@task()
def something():
print("something")
my_shell_task = ShellTask()
with Flow('myflow') as flow:
destination_bucket_name = Parameter("destination_bucket_name", DEFAULT_DESTINATION_BUCKET_NAME)
cmd = f"myexecutable --destination_bucket_name {destination_bucket_name}"
my_shell_task(command=cmd)
something(upstream_tasks=[my_shell_task])
Kevin Kho
cmd
. This will work if you wrap cmd
as a task. Example below:Kevin Kho
@task
def create_cmd(destimation_bucket_name):
return f"myexecutable --destination_bucket_name {destination_bucket_name}"
Kevin Kho
cmd = create_cmd()
. but yes to the upstream task dependency being set thereKevin Kho
something()
something.set_upstream(my_shell_task)
Jay Sundaram
04/08/2021, 3:40 PMsomething()
myobject = something.set_upstream(my_shell_task)
is this correct?Kevin Kho
something()
so i think this should be
myobject = something()
myobject.set_upstream(my_shell_task)
Kevin Kho
Jay Sundaram
04/08/2021, 3:49 PMNameError: name 'myobject' is not defined
🤔Jay Sundaram
04/08/2021, 3:50 PMTypeError: run() missing required argument: 'command'
Kevin Kho
Kevin Kho
import prefect
from prefect import Task
from prefect import task, Flow, Parameter
from prefect.tasks.shell import ShellTask
shell_task = ShellTask(
helper_script="",
shell="bash",
log_stderr=True,
return_all=True,
stream_output=True,
)
@task
def get_command(param):
return f"echo '{param}'"
@task
def other_task():
return 2
with Flow(name="Test") as flow:
param = Parameter('param')
cmd = get_command(param)
test = shell_task(command=cmd)
something = other_task()
something.set_upstream(test)
flow.run(parameters=dict(param='a'))
Kevin Kho
Jay Sundaram
04/08/2021, 4:03 PMKevin Kho
Kevin Kho
test
in the code snippetJay Sundaram
04/08/2021, 4:07 PMKevin Kho
test
as something to hold the state of the shell_task
. We are setting test
as an upstream dependency meaning we want to see the STATE of shell_task
be SUCCESS before other_task
runs. If we don’t have any dependency requirements, we don’t need to store the STATE of shell_task
into the test
variable because we don’t need to track it. Does this help?Jay Sundaram
04/08/2021, 4:12 PMJay Sundaram
04/08/2021, 4:14 PM<http://logger.info|logger.info>(f"command is: '{cmd}'")
is yielding this in the UI log:
ommand is: '<Task: create_cmd>'
Kevin Kho
Kevin Kho
Dylan
Dylan
Dylan
Dylan
Jay Sundaram
04/08/2021, 4:23 PMJay Sundaram
04/08/2021, 4:23 PMDylan
test
from the example ensure that the shell task ran before the other_task
?Jay Sundaram
04/08/2021, 4:28 PMtest
, so i was just asking if we remove assigning the invocation of shell_task()
to test, what would pass to set_upstream()
seems like the assignment should be preserved in the code
test = shell_task()
so that there is something to pass to set_upstream()
Kevin Kho
test = shell_task()
holds both STATE
and the RESULT
. Setting upstream dependencies like that use the STATE
, but not RESULT
so we still need test
to hold the STATE
Kevin Kho