What does it mean when on invoking a system call f...
# ask-community
j
What does it mean when on invoking a system call from within a task, a <Parameter> is used(?) instead of the expected string?
Copy code
--destination_bucket_name <Parameter: destination_bucket_name>
when was expecting:
Copy code
--destination_bucket_name myorg-S3-bucket
d
Hi @Jay Sundaram! Can you share a short reproducible example?
Execution within tasks is deferred to runtime
So, if you’re passing a
Parameter
as an argument to a Task, that parameter isn’t populated until runtime, either
When the script is loaded (a moment we sometimes call “build” or “definition” time) none of the code inside your tasks is run
This time is used for Prefect to figure out which tasks depend on each other and to define a dependency graph
Then, when you call
flow.run()
locally or an agent starts a flow run, parameters are defined and the code inside your tasks is executed
This sounds like you may be printing something at script definition instead of run time
j
Copy code
def _execute_cmd(cmd):
    p = subprocess.Popen(cmd, shell=True)
    ...

@task()
def convert(destination_bucket_name):
    cmd = f"myexecutable --destination_bucket_name {destination_bucket_name}"
    _execute_cmd(cmd)


with Flow('myflow') as flow:
    destination_bucket_name = Parameter("destination_bucket_name", DEFAULT_DESTINATION_BUCKET_NAME)
    convert(destination_bucket_name)
Something like this.
k
Hey @Jay Sundaram, it might help you to use our
ShellTask
to run that subprocess call: https://docs.prefect.io/api/latest/tasks/shell.html
upvote 1
d
That was going to be my suggestion 😄
k
This should defer everything to run time and help you use the Parameter
j
How do ensure downstream tasks wait for the completion of the shell task?
d
There are two ways to accomplish this
If a task returns something, passing that result to the next task will create the dependency automatically
You can also explicitly set dependencies between tasks
j
the shell task will execute a command-line executable so nothing really return
d
Check out
set_upstream
methods
In that doc
j
So something like this it looks like. Perhaps?
Copy code
@task()
def something():
    print("something")

my_shell_task = ShellTask()

with Flow('myflow') as flow:
    destination_bucket_name = Parameter("destination_bucket_name", DEFAULT_DESTINATION_BUCKET_NAME)
    cmd = f"myexecutable --destination_bucket_name {destination_bucket_name}"
    my_shell_task(command=cmd)
    something(upstream_tasks=[my_shell_task])
k
I don’t think this will work because the Parameter is not defined when you define
cmd
. This will work if you wrap
cmd
as a task. Example below:
Copy code
@task
def create_cmd(destimation_bucket_name):
    return f"myexecutable --destination_bucket_name {destination_bucket_name}"
then do
cmd = create_cmd()
. but yes to the upstream task dependency being set there
Actually sorry. The way to do upstream tasks is like this:
Copy code
something()
something.set_upstream(my_shell_task)
j
and if something returns an object:
Copy code
something()
myobject = something.set_upstream(my_shell_task)
is this correct?
k
i think you want myobject to get something from
something()
so i think this should be
Copy code
myobject = something()
myobject.set_upstream(my_shell_task)
I think this is a good snippet to illustrate. Chech the code block
j
Copy code
NameError: name 'myobject' is not defined
🤔
also:
Copy code
TypeError: run() missing required argument: 'command'
k
Let me make an example for you
Copy code
import prefect
from prefect import Task
from prefect import task, Flow, Parameter
from prefect.tasks.shell import ShellTask

shell_task = ShellTask(
    helper_script="",
    shell="bash",
    log_stderr=True,
    return_all=True,
    stream_output=True,
)

@task
def get_command(param):
    return f"echo '{param}'"

@task
def other_task():
    return 2

with Flow(name="Test") as flow:
    param = Parameter('param')
    cmd = get_command(param)
    test = shell_task(command=cmd)

    something = other_task()
    something.set_upstream(test)

flow.run(parameters=dict(param='a'))
I think you weren’t passing the command to the shell task during runtime
j
The mistake was I had set the .set_upstream() parameter to shell_task instead shell_task's return context(?) What is "test" in this case?
k
Something to hold the result of shell_task but let me find someone who can give a better answer
If you’re not setting an upstream to the shell task, you don’t need
test
in the code snippet
j
Hmm. Don't understand that statement. I want to make sure that downstream tasks wait for the shell command to finish execution.
k
Sorry was unclear. Think of
test
as something to hold the state of the
shell_task
. We are setting
test
as an upstream dependency meaning we want to see the STATE of
shell_task
be SUCCESS before
other_task
runs. If we don’t have any dependency requirements, we don’t need to store the STATE of
shell_task
into the
test
variable because we don’t need to track it. Does this help?
j
hmm- so, what do you pass as parameter to something.set_upstream() to ensure that something waits for shell_task to complete?
also, this:
Copy code
<http://logger.info|logger.info>(f"command is: '{cmd}'")
is yielding this in the UI log:
Copy code
ommand is: '<Task: create_cmd>'
k
You want the logger to happen inside a task. Doc. This won’t work because Task hasn’t run yet.
👍 1
On your other question, I’ll get someone else to respond
d
Hey @Jay Sundaram! It would really help us out if you can show us longer code snippets when you’re referencing specific log outputs, like here: https://prefect-community.slack.com/archives/CL09KU1K7/p1617898495246400?thread_ts=1617894495.235500&amp;cid=CL09KU1K7
Also, we have dedicated professional services options if you need some help getting up and running with Prefect at your organization. Email us at sales@prefect.io for more details
j
yup at the time he posted it. thanks
yup i've pinged a team member about the support services
👍 1
d
did passing
test
from the example ensure that the shell task ran before the
other_task
?
j
yeah. the recommendation was to remove
test
, so i was just asking if we remove assigning the invocation of
shell_task()
to test, what would pass to
set_upstream()
seems like the assignment should be preserved in the code
test = shell_task()
so that there is something to pass to
set_upstream()
k
test = shell_task()
 holds both
STATE
and the
RESULT
. Setting upstream dependencies like that use the
STATE
, but not
RESULT
so we still need
test
to hold the
STATE
Even if it doesn’t return anything (Result is None)
👍 1