https://prefect.io logo
#prefect-community
Title
# prefect-community
a

Amruth VVKP

05/03/2022, 10:45 AM
I am getting a very wired error on Prefect Orion 2.0b3 when I am attempting to run a Prefect Shell task Here's how my flow is structured - Scenario 1 - Primary Flow: Flow is not a async flow. Uses sequential task runner, a bunch of processing tasks. The final task is to build a CLI and trigger a shell task that runs a particular test, makes few API calls, etc.
Copy code
@flow(name='primary_flow', version='1', task_runner=SequentialTaskRunner())
def primary_flow(**kwargs):
    ... # perform data processing tasks, all tasks are non-async
    for cli_command in parsed.result():
        shell_run_command(cli_command) # I've also tried using asyncio.run for running the shell task
Scenario 2 - Primary Flow: Uses sequential task runner, a bunch of processing tasks. The final task to build and call a CLI is done through another async subflow
Copy code
@flow(name='primary_flow', version='1', task_runner=SequentialTaskRunner())
def primary_flow(**kwargs):
    ... # perform data processing tasks, all tasks are non-async
    for _ in parsed.result():
        cli_flow(_) # I've also tried using asyncio.run for running the shell task

@flow(name='cli_flow', version='1', task_runner=SequentialTaskRunner())
async def cli_flow(parsed):
    for _ in parsed:
        ... # Build CLI
        await shell_run_command(cli_command)
I am getting this exception thrown (attached screenshot) and the CLI task is not getting started/executed. Can someone help me out on this one please (I am terrible with asnyc processes)?
discourse 1
a

Anna Geller

05/03/2022, 11:19 AM
it could be something with the event loop when you run this task in a for-loop - can you try a simple flow like this one and confirm whether this works for you?
Copy code
from prefect import flow
from prefect_shell import shell_run_command


@flow
def example_shell_run_command_flow():
    return shell_run_command(command="ls .", return_all=True)


if __name__ == "__main__":
    example_shell_run_command_flow()
once that's working, you can try something like this:
Copy code
from prefect import flow
from prefect_shell import shell_run_command


@flow
def example_shell_loop():
    commands = ["mkdir test1", "mkdir test2", "mkdir test3"]
    for cmd in commands:
        shell_run_command(command=cmd, return_all=True)


if __name__ == "__main__":
    example_shell_loop()
a

Amruth VVKP

05/03/2022, 11:24 AM
Yes this works fine for me, btb I am using Python 3.7.5 if I've not mentioned that above
a

Anna Geller

05/03/2022, 11:25 AM
a

Amruth VVKP

05/03/2022, 11:27 AM
Both the above examples are running good on my environment.
I've tried using the code from my initial situation to run the same commands and it throws the exception when I am calling up the sub-flow
a

Anna Geller

05/03/2022, 11:30 AM
can you share the full flow code you are using? I can try to reproduce
the same with async:
Copy code
import asyncio
from prefect import flow
from prefect_shell import shell_run_command


@flow
async def example_shell_loop():
    commands = ["mkdir -p test1", "mkdir -p test2", "mkdir -p test3"]
    for cmd in commands:
        await shell_run_command(command=cmd, return_all=True)


if __name__ == "__main__":
    asyncio.run(example_shell_loop())
same with the first flow - making it async:
Copy code
import asyncio
from prefect import flow
from prefect_shell import shell_run_command


@flow
async def example_shell_run_command_flow():
    await shell_run_command(command="ls .", return_all=True)


if __name__ == "__main__":
    asyncio.run(example_shell_run_command_flow())
a

Amruth VVKP

05/03/2022, 11:40 AM
Both the above async flows are running just fine, I am trying to put up a sample code for you to reproduce (since my actual code is company confidential).
👍 1
I've found a work around for now, I've got a FastAPI server connected to the underlying Orion infrastructure. This FastAPI server is completely asynchronous and moving the existing shell command workflow to the FastAPI section allowed me to have perfectly concurent and non concurent calls performed. My wild guess is that part of my workflows are concurent and part of them aren't. Using a sequential runner might have overlapped async threads that caused the exception. Since I am not able to copy back the confidential code here, I wouldn't be able to get you a code to reproduce. Thanks again for the prompt response.
a

Anna Geller

05/04/2022, 9:43 AM
Sure, glad to help. Does it mean you are running a custom Orion fork only because the sync/async task setting didn't work for you? This seems like a drastic measure
23 Views