Kyle McChesney

    Kyle McChesney

    1 year ago
    Has anyone worked extensively with ShellTask? Specifically I am looking to use
    .map
    and include the mapped value into either the command or the helper_script. For example:
    @task
    def files():
        return [
            '/opt/file1.txt',
            '/opt/file2.txt',
            '/opt/file3.txt',
            '/opt/file4.txt',
        ]
    
    rm_task = ShellTask(
        command='rm $file',
    )
    
    with Flow('shell') as flow:
        files_to_delete = files()
        rm_task.map(files_to_delete, helper_script='file="{mapped_value}"')
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Kyle McChesney, would this work for you?
    from prefect.utilities.tasks import task
    
    
    from prefect import task, Flow
    from prefect.tasks.shell import ShellTask
    
    @task
    def files():
        return [
            '/opt/file1.txt',
            '/opt/file2.txt',
            '/opt/file3.txt',
            '/opt/file4.txt',
        ]
    
    @task
    def rm_task(file_name):
        return ShellTask(command=f'echo {file_name}').run()
    
    with Flow('shell') as flow:
        files_to_delete = files()
        rm_task.map(files_to_delete)
    
    flow.run()
    Or this:
    from prefect.utilities.tasks import task
    
    
    from prefect import task, Flow
    from prefect.tasks.shell import ShellTask
    
    @task
    def files():
        return [
            '/opt/file1.txt',
            '/opt/file2.txt',
            '/opt/file3.txt',
            '/opt/file4.txt',
        ]
    
    @task
    def format_commands(file_name):
        return f'echo "{file_name}"'
    
    rm_task = ShellTask()
    
    with Flow('shell') as flow:
        files_to_delete = files()
        commands = format_commands.map(files_to_delete)
        rm_task.map(command=commands)
    
    flow.run()
    Kyle McChesney

    Kyle McChesney

    1 year ago
    I am trying the first one right now, I does not appear to actually run the task as far as I can tell
    Kevin Kho

    Kevin Kho

    1 year ago
    Try
    @task
    def rm_task(file_name):
        ShellTask(command=f'echo {file_name}').run()
        return
    Kyle McChesney

    Kyle McChesney

    1 year ago
    okay yea, with run it works
    from prefect import Flow, task
    from prefect.executors import LocalDaskExecutor
    from prefect.run_configs import LocalRun
    from prefect.tasks.shell import ShellTask
    
    
    @task
    def files():
        return [
            '/Users/kylem/Dev/mb/var/prefect/rm-file/file1.txt',
            '/Users/kylem/Dev/mb/var/prefect/rm-file/file2.txt',
            '/Users/kylem/Dev/mb/var/prefect/rm-file/file3.txt',
        ]
    
    
    @task
    def rm(file):
        return ShellTask(
            command=f'rm {file}',
        ).run()
    
    
    with Flow(
        'rm',
        run_config=LocalRun(),
        executor=LocalDaskExecutor(),
    ) as flow:
        files_to_rm = files()
        rm.map(files_to_rm)
    how would this task within a task thing effect stuff like visualization and checkpointing? is prefect aware (before or after running) that this task was created by the other one? can we resume/retry these tasks if they fail? For some context, the rm example is simplified, I will likely be doing some more complex calls to run various binaries, etc. They will take a bit to run, might produce results files, etc
    Kevin Kho

    Kevin Kho

    1 year ago
    The
    task.run()
    call just runs the Python code underneath so for this purpose the retry and the task is the outer function now and you would put the retries and results there.
    ShellTask.run()
    is not a task anymore. Maybe the more proper approach is to subclass the
    ShellTask
    and then add your input and then call the
    super
    ? I think this approach and that would be the same.
    Kyle McChesney

    Kyle McChesney

    1 year ago
    makes sense. Last question, is there anything on the roadmap around expanding what fields can be templated? For example the command parameter in ShellTask?
    Kevin Kho

    Kevin Kho

    1 year ago
    Short answer is there is no roadmap and I don’t think we would include the task input as a temptable field. I think the templating is a mechanism to template objects that need to be specified during buildtime, but actually get populated during runtime. For example, result location or task name. Templating these would get injected during runtime. It’s to provide the bridge of filling values of stuff that needs to be “built” In this case though the input to the task isn’t created at build time. It’s all deferred so I don’t think we would template that in the future. This seems like something handled by flow logic rather than Prefect injecting values to support the Flow if that makes sense.