Richard Hughes
07/17/2020, 7:02 PMnicholas
ShellTaskRichard Hughes
07/17/2020, 7:55 PMShellTasknicholas
Richard Hughes
07/17/2020, 8:11 PMRichard Hughes
07/17/2020, 10:05 PMnicholas
Richard Hughes
07/21/2020, 6:12 PMimport os
import shlex
import sys
from prefect import task, Flow
from subprocess import Popen, PIPE
from prefect.triggers import all_successful, any_failed, some_successful
@task(name="PowerShell")
def sub_process_task(command):
    command=shlex.split(command)
    commandArray = ["powershell.exe"] + command
    p = Popen(commandArray, stdout=PIPE, stderr=PIPE)
    output, error = p.communicate()
    if p.returncode != 0: 
        raise Exception("powershell failed %d %s %s" % (p.returncode, output.decode("utf-8"), error.decode("utf-8")))
    else:
        #print("powershell succeeded %d %s" % (p.returncode, output.decode("utf-8")))
        return output.decode("utf-8")
@task(name="Print", trigger=all_successful)
def output_handler(output):
    print('Finished Running the flow')
    print(output)
    print('Finished Printing the flow')
with Flow('PowerShell') as flow:
    command="Write-Host 'Hello World'; Write-Host 'Test12345'; Get-Location; Get-ChildItem -Dir;"
    output=sub_process_task(command=command)
    output_handler(output)
flow.run()nicholas
