What is the best practice for running sub-flows in parallel (not thread concurrent), while keeping a...
r
What is the best practice for running sub-flows in parallel (not thread concurrent), while keeping all the benefits of UI observability? The above thread addresses the issue partly, suggesting the use of
run_deployment
to create sub-flows. While this works technically, it partially severs the link between the main flow and its subflows. When I try the
run_deployment
sub-flow method, the sub-flows do appear within the main flow, I get a blank execution graph (see image 1). I even have a task within the main flow, but that doesn't show up on the execution graph either (see image 2). So while I THINK
run_deployment
is working as expected, I certainly can't verify that parallel execution is working via the UI. ## Situation and additional requirements I need to be able to run a flow of sub-flows where each sub-flow runs in parallel, at the same time. The sub-flows can also have their own sub-flows, and they all need to be classified as flows (rather than tasks) so that any of the sub components can be run independently if/when needed. I also need to be able to return the state from each of the sub-flows so that if any of them fail, I can mark the main flow as failed. (Something I haven't figured out yet.) It would be great to have the ability to return a result to the main flow, but that might be a tough ask with
run_deployment
, which is another reason using
run_deployment
seems like a compromise compared to the usual seamless functionality. ## Conclusion Wishlist summary • Create sub-flows that can run in parallel • Have all the normal UI functionality work the same as any other sub-flow • Ability to get the state of all sub-flows within the main flow • Ability to return results from sub-flows within the main flow Please let me know if there is a better way to create parallel sub-flows. I'm happy to submit an issue, but I wanted to ask here in case I'm missing something obvious! 😉
🙌 1
I forgot to mention I'm on version 2.16.2
d
I was able to get subflows to work correctly. We don't directly use the prefect decorators for things, so we do some monkey business that can automatically switch between running things locally with threads vs. invoking deployments.
I'm not sure what's going on with your subflow or task graph. We don't have any issues with it!
r
What version are you running?
And did you end up using
run_deployment
to create a subflow?
d
Yes, we use
run_deployment
under the hood
I took a look at the underlying code and I may end up writing my own version of
run_deployment
I'm using 2.19.0
r
Ah, I started the flow via
asyncio.run
. That seems to have been the culprit. If I start the main flow via
run_deployment
as well, then the graph shows up. Thanks for the quick reply Dev, I appreciate it!!! 😄
d
asyncio
trips me up at least once a week 🙈
Cool! Happy chat -- I'm curious to see how you've decided to implement the parallel flows.
r
By the way, do you return the state or get results from your
run_deployment
subflows? How do you deal with those issues?
d
That is a bit of a pain
We store our results in S3
There's some issues with extracting from async and from the prefect result wrapper
m
Hi @Richard Alexander, I have been using the
run_deployment
workaround to manage parallel flows with subflows. Its going pretty well with a couple of pain points. I initiate from a deployment which calls many other deployments as subflows:
Copy code
@flow
async def run_many(deployment_list):
   async with asyncio.TaskGroup() as tg:
        flow_runs = []
        for deployment in deployment_list:
            task = tg.create_task(
                run_deployment(
                    name=deployment.get('slug')
                    , parameters=deployment.get('params')
                    , timeout=3600
                    )
                )
            await asyncio.sleep(1)
            flow_runs.append(task)

    # this is important later
    final_states = [x.result().state.type._value_  for x in flow_runs]

    return check_bulk_states(final_states)
I'm able to call this function without an
await
like this:
Copy code
result = run_many(my_list_of_deployments)
So, yes the subflows which are flow-runs of the deployed flows are detached in a sense from the outer flow, but with
timeout
it will wait for the result to be returned. The
check_bulk_states
function is how I determine success/failure status, as you mentioned the outer flow will succeed if the subflows dont timeout even if they fail:
Copy code
def check_bulk_states(states_list):
    """
    TaskGroup flows will succeed regardless of end state for subflow async deployment runs. This function returns a Failed state to be passed back from the flow.
    """
    logger = get_run_logger()

    if not all(x == "COMPLETED" for x in states_list):
        logger.error(f"Check final states: {states_list}")
        return Failed(message="State check FAILED: one or more jobs failed.")
    else:
        <http://logger.info|logger.info>(f"Check final states: {states_list}")
        <http://logger.info|logger.info>("SUCCESS: all jobs complete.")
        return states_list
The two pain points: 1. If one fails, you can't just re-run the outer flow and it will only do the failed job. It will re-run all. 2. I use git repo flow code storage, since these are deployments it call
git clone
for every single one. If I am running a SQL DDL backfill for instance, I can have 1000s of git clone calls and they hit this error:
IOError: [Errno 24] Too many open files
. I'd prefer to call a deployment that pulls the code once, and then runs a parallel subflow from there instead.
Lastly, the first rc for Prefect 3.0 is out, it looks like subtasks will be supported and those could run parallel concurrently without the
run_deployment
workaround. Also, the reason I used
run_deployment
workaround was because of this now-closed issue, so perhaps concurrent identical subflows (with different parameters) will run properly now.
r
Thanks for adding this solution @merlin! It's very similar to what I came up with, so I'll add my pseudo-code as well:
Copy code
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run

@flow
async def main_flow():
    sub_flows = []
    sub_flows.append(
        await run_deployment('flow_name/deployment_name', 
                                parameters=params, 
                                timeout=0
                            )
    )
    
    # ADD MORE SUBFLOWS HERE...

    for flow_run in sub_flows:
        await wait_for_flow_run(flow_run.id, poll_interval=5)
The main difference is my use of
timeout=0
in the run_deployment arguments and then
wait_for_flow_run()
. This avoids the need to explicitly call asyncio, but I can also see advantages to using your method in certain situations.
Let's hope 3.0 makes some of these complex scenarios smoother! :-) Question: Do you ever have a need to get return values back from the subflows to the main flow? Is that even possible with run_deployment?
m
this is awesome. up front ill admit asyncio confuses me. i havent needed return values yet, but i think its possible bc my final_states call is getting flow results back from x.result() for each flow_run above. that object prob holds other return values along w states which I was using
y
@merlin You wrote: "Lastly, the first rc for Prefect 3.0 is out, it looks like subtasks will be supported and those could run parallel concurrently without the..." Parallel or concurrently? They are two different things 🙂 What I wish for Prefect 3.0 and the new "task within a task" is that you would be also able to define MULTIPLE task runners in. This way some tasks could .submit() to the concurrent runner and some tasks could be .submit() to a parallel runner (like Dask).
m
This is a great point -- rant about to begin. What my role needs is orchestration where I don't have to write software designed around that difference. Me just want run some jobs same time, if have more machine then run multiple machines maybe ok maybe not ok, I'll let the orchestrator know. But so far I've had to learn about asyncio, task groups, changing all nested function definitions to
async def
, on and on. The thing I need to optimize is the large number of jobs with interspersed dependencies, performance tuning is not a factor at all. I just have work to do and a lot of people depending on my output and I need to scale. Meanwhile concurrent parametrized flows has been a head scratcher for a while, and for a long time calling
run_deployment
for each was the official workaround.
y
@merlin take a look at my ECSTaskRunner:
With an ECSTaskRunner + The ability to define multiple task runner on a flow + task with in a task coming in v3.0 - such a combination could give a very pleasant experience for doing things parallel and/or concurrent.
You would a single flow so everything is nicely displayed under it (no child flows).
If you need some quick http call and retry on it, just .submit() to the concurrent runner. If you something that needs some cpu/memory of it's own, then .submit() to the ECSTaskRunner.
🙌 1
Upvote the ECSTaskRunner and we'll gently push the Prefect team towards this 😀
m
This makes a lot of sense! Here is the bug that diverted me onto run_deployment for concurrent parametrized flows, issue 7319, issue 7322, looks like resolved but I haven't gone back to refactor since then. Did you run into this problem previously?
y
@merlin I haven’t try this use case…
1010 Views