Thread
#prefect-community
    Amruth VVKP

    Amruth VVKP

    4 months ago
    I am getting a very wired error on Prefect Orion 2.0b3 when I am attempting to run a Prefect Shell task Here's how my flow is structured -Scenario 1 - Primary Flow: Flow is not a async flow. Uses sequential task runner, a bunch of processing tasks. The final task is to build a CLI and trigger a shell task that runs a particular test, makes few API calls, etc.
    @flow(name='primary_flow', version='1', task_runner=SequentialTaskRunner())
    def primary_flow(**kwargs):
        ... # perform data processing tasks, all tasks are non-async
        for cli_command in parsed.result():
            shell_run_command(cli_command) # I've also tried using asyncio.run for running the shell task
    Scenario 2 - Primary Flow: Uses sequential task runner, a bunch of processing tasks. The final task to build and call a CLI is done through another async subflow
    @flow(name='primary_flow', version='1', task_runner=SequentialTaskRunner())
    def primary_flow(**kwargs):
        ... # perform data processing tasks, all tasks are non-async
        for _ in parsed.result():
            cli_flow(_) # I've also tried using asyncio.run for running the shell task
    
    @flow(name='cli_flow', version='1', task_runner=SequentialTaskRunner())
    async def cli_flow(parsed):
        for _ in parsed:
            ... # Build CLI
            await shell_run_command(cli_command)
    I am getting this exception thrown (attached screenshot) and the CLI task is not getting started/executed. Can someone help me out on this one please (I am terrible with asnyc processes)?
    Anna Geller

    Anna Geller

    4 months ago
    it could be something with the event loop when you run this task in a for-loop - can you try a simple flow like this one and confirm whether this works for you?
    from prefect import flow
    from prefect_shell import shell_run_command
    
    
    @flow
    def example_shell_run_command_flow():
        return shell_run_command(command="ls .", return_all=True)
    
    
    if __name__ == "__main__":
        example_shell_run_command_flow()
    once that's working, you can try something like this:
    from prefect import flow
    from prefect_shell import shell_run_command
    
    
    @flow
    def example_shell_loop():
        commands = ["mkdir test1", "mkdir test2", "mkdir test3"]
        for cmd in commands:
            shell_run_command(command=cmd, return_all=True)
    
    
    if __name__ == "__main__":
        example_shell_loop()
    Amruth VVKP

    Amruth VVKP

    4 months ago
    Yes this works fine for me, btb I am using Python 3.7.5 if I've not mentioned that above
    Anna Geller

    Anna Geller

    4 months ago
    Amruth VVKP

    Amruth VVKP

    4 months ago
    Both the above examples are running good on my environment.
    I've tried using the code from my initial situation to run the same commands and it throws the exception when I am calling up the sub-flow
    Anna Geller

    Anna Geller

    4 months ago
    can you share the full flow code you are using? I can try to reproduce
    the same with async:
    import asyncio
    from prefect import flow
    from prefect_shell import shell_run_command
    
    
    @flow
    async def example_shell_loop():
        commands = ["mkdir -p test1", "mkdir -p test2", "mkdir -p test3"]
        for cmd in commands:
            await shell_run_command(command=cmd, return_all=True)
    
    
    if __name__ == "__main__":
        asyncio.run(example_shell_loop())
    same with the first flow - making it async:
    import asyncio
    from prefect import flow
    from prefect_shell import shell_run_command
    
    
    @flow
    async def example_shell_run_command_flow():
        await shell_run_command(command="ls .", return_all=True)
    
    
    if __name__ == "__main__":
        asyncio.run(example_shell_run_command_flow())
    Amruth VVKP

    Amruth VVKP

    4 months ago
    Both the above async flows are running just fine, I am trying to put up a sample code for you to reproduce (since my actual code is company confidential).
    I've found a work around for now, I've got a FastAPI server connected to the underlying Orion infrastructure. This FastAPI server is completely asynchronous and moving the existing shell command workflow to the FastAPI section allowed me to have perfectly concurent and non concurent calls performed. My wild guess is that part of my workflows are concurent and part of them aren't. Using a sequential runner might have overlapped async threads that caused the exception. Since I am not able to copy back the confidential code here, I wouldn't be able to get you a code to reproduce. Thanks again for the prompt response.
    Anna Geller

    Anna Geller

    4 months ago
    Sure, glad to help. Does it mean you are running a custom Orion fork only because the sync/async task setting didn't work for you? This seems like a drastic measure