Amruth VVKP
05/03/2022, 10:45 AM@flow(name='primary_flow', version='1', task_runner=SequentialTaskRunner())
def primary_flow(**kwargs):
... # perform data processing tasks, all tasks are non-async
for cli_command in parsed.result():
shell_run_command(cli_command) # I've also tried using asyncio.run for running the shell task
Scenario 2 -
Primary Flow: Uses sequential task runner, a bunch of processing tasks. The final task to build and call a CLI is done through another async subflow
@flow(name='primary_flow', version='1', task_runner=SequentialTaskRunner())
def primary_flow(**kwargs):
... # perform data processing tasks, all tasks are non-async
for _ in parsed.result():
cli_flow(_) # I've also tried using asyncio.run for running the shell task
@flow(name='cli_flow', version='1', task_runner=SequentialTaskRunner())
async def cli_flow(parsed):
for _ in parsed:
... # Build CLI
await shell_run_command(cli_command)
I am getting this exception thrown (attached screenshot) and the CLI task is not getting started/executed. Can someone help me out on this one please (I am terrible with asnyc processes)?Anna Geller
from prefect import flow
from prefect_shell import shell_run_command
@flow
def example_shell_run_command_flow():
return shell_run_command(command="ls .", return_all=True)
if __name__ == "__main__":
example_shell_run_command_flow()
Anna Geller
from prefect import flow
from prefect_shell import shell_run_command
@flow
def example_shell_loop():
commands = ["mkdir test1", "mkdir test2", "mkdir test3"]
for cmd in commands:
shell_run_command(command=cmd, return_all=True)
if __name__ == "__main__":
example_shell_loop()
Amruth VVKP
05/03/2022, 11:24 AMAnna Geller
Amruth VVKP
05/03/2022, 11:27 AMAmruth VVKP
05/03/2022, 11:27 AMAnna Geller
Anna Geller
import asyncio
from prefect import flow
from prefect_shell import shell_run_command
@flow
async def example_shell_loop():
commands = ["mkdir -p test1", "mkdir -p test2", "mkdir -p test3"]
for cmd in commands:
await shell_run_command(command=cmd, return_all=True)
if __name__ == "__main__":
asyncio.run(example_shell_loop())
Anna Geller
import asyncio
from prefect import flow
from prefect_shell import shell_run_command
@flow
async def example_shell_run_command_flow():
await shell_run_command(command="ls .", return_all=True)
if __name__ == "__main__":
asyncio.run(example_shell_run_command_flow())
Amruth VVKP
05/03/2022, 11:40 AMAmruth VVKP
05/04/2022, 9:08 AMAnna Geller
Anna Geller
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by