https://prefect.io logo
k

Kiley Roberson

09/18/2023, 9:24 PM
Hello! Looking for some help on a problem I am running into. I have a parent flow that runs a bunch of subflows, it runs X of them at a time and does it in batches. However I want it to be executed in rolling concurrency, so theres always X running and when one finishes it launches a new one. I was able to get this to work with some test code (like a sleep x seconds then print) but I am running into an issue when trying to run product code. It seems like I need to do something like this where I launch the code as a new thread so that its async and not blocking the event loop - like this
Copy code
async def my_subflow(id: str):
    def run_logic():
        product_logic(id)

    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, run_logic)
However - I am getting this error when trying to set up the logger:
Flow run encountered an exception. MissingContextError: There is no active flow or task run context
So some needed context is being lost when I create the new thread. Im wondering if there is a different approach for how I can execute the logic as a non blocking routine or maybe some way I can set the needed context? Thanks!