Kyle McChesney
08/03/2021, 4:05 PM.map
and include the mapped value into either the command or the helper_script. For example:
@task
def files():
return [
'/opt/file1.txt',
'/opt/file2.txt',
'/opt/file3.txt',
'/opt/file4.txt',
]
rm_task = ShellTask(
command='rm $file',
)
with Flow('shell') as flow:
files_to_delete = files()
rm_task.map(files_to_delete, helper_script='file="{mapped_value}"')
Kevin Kho
from prefect.utilities.tasks import task
from prefect import task, Flow
from prefect.tasks.shell import ShellTask
@task
def files():
return [
'/opt/file1.txt',
'/opt/file2.txt',
'/opt/file3.txt',
'/opt/file4.txt',
]
@task
def rm_task(file_name):
return ShellTask(command=f'echo {file_name}').run()
with Flow('shell') as flow:
files_to_delete = files()
rm_task.map(files_to_delete)
flow.run()
Kevin Kho
from prefect.utilities.tasks import task
from prefect import task, Flow
from prefect.tasks.shell import ShellTask
@task
def files():
return [
'/opt/file1.txt',
'/opt/file2.txt',
'/opt/file3.txt',
'/opt/file4.txt',
]
@task
def format_commands(file_name):
return f'echo "{file_name}"'
rm_task = ShellTask()
with Flow('shell') as flow:
files_to_delete = files()
commands = format_commands.map(files_to_delete)
rm_task.map(command=commands)
flow.run()
Kyle McChesney
08/03/2021, 4:20 PMKevin Kho
@task
def rm_task(file_name):
ShellTask(command=f'echo {file_name}').run()
return
Kyle McChesney
08/03/2021, 4:26 PMfrom prefect import Flow, task
from prefect.executors import LocalDaskExecutor
from prefect.run_configs import LocalRun
from prefect.tasks.shell import ShellTask
@task
def files():
return [
'/Users/kylem/Dev/mb/var/prefect/rm-file/file1.txt',
'/Users/kylem/Dev/mb/var/prefect/rm-file/file2.txt',
'/Users/kylem/Dev/mb/var/prefect/rm-file/file3.txt',
]
@task
def rm(file):
return ShellTask(
command=f'rm {file}',
).run()
with Flow(
'rm',
run_config=LocalRun(),
executor=LocalDaskExecutor(),
) as flow:
files_to_rm = files()
rm.map(files_to_rm)
Kyle McChesney
08/03/2021, 4:30 PMKevin Kho
task.run()
call just runs the Python code underneath so for this purpose the retry and the task is the outer function now and you would put the retries and results there. ShellTask.run()
is not a task anymore. Maybe the more proper approach is to subclass the ShellTask
and then add your input and then call the super
? I think this approach and that would be the same.Kyle McChesney
08/03/2021, 4:51 PMKevin Kho