mark.smith
07/16/2021, 1:14 AMKevin Kho
mark.smith
07/16/2021, 2:37 PM#--------------------------------------------------------------
# Imports
#--------------------------------------------------------------
# basic imports
from prefect import Flow, Parameter, task
# specific task class imports
from prefect.tasks.shell import ShellTask
#--------------------------------------------------------------
# Define custom task functions
#--------------------------------------------------------------
@task
def plus_one(x):
"""A task that adds 1 to a number"""
return x + 1
@task
def build_command(name):
return 'echo "HELLO, {}!"'.format(name)
@task
def print_it(x):
print(x)
#--------------------------------------------------------------
# Instantiate task classes
#--------------------------------------------------------------
run_in_cmd = ShellTask(name='run a command', shell='cmd', return_all=True)
#--------------------------------------------------------------
# Open a Flow context and use the functional API (if possible)
#--------------------------------------------------------------
with Flow('Best Practices') as flow:
# store the result of each task call, even if you don't use the result again
two = plus_one(1)
# for clarity, call each task on its own line
name = Parameter('name')
cmd = build_command(name=name)
shell_result = run_in_cmd(command=cmd)
print_it(shell_result)
# use the imperative API where appropriate
#shell_result.set_upstream(two)
flow.run(parameters=dict(name='mark'))
Kevin Kho