Richard Alexander
05/31/2024, 1:23 PMrun_deployment
to create sub-flows. While this works technically, it partially severs the link between the main flow and its subflows. When I try the run_deployment
sub-flow method, the sub-flows do appear within the main flow, I get a blank execution graph (see image 1).
I even have a task within the main flow, but that doesn't show up on the execution graph either (see image 2).
So while I THINK run_deployment
is working as expected, I certainly can't verify that parallel execution is working via the UI.
## Situation and additional requirements
I need to be able to run a flow of sub-flows where each sub-flow runs in parallel, at the same time. The sub-flows can also have their own sub-flows, and they all need to be classified as flows (rather than tasks) so that any of the sub components can be run independently if/when needed.
I also need to be able to return the state from each of the sub-flows so that if any of them fail, I can mark the main flow as failed. (Something I haven't figured out yet.)
It would be great to have the ability to return a result to the main flow, but that might be a tough ask with run_deployment
, which is another reason using run_deployment
seems like a compromise compared to the usual seamless functionality.
## Conclusion
Wishlist summary
• Create sub-flows that can run in parallel
• Have all the normal UI functionality work the same as any other sub-flow
• Ability to get the state of all sub-flows within the main flow
• Ability to return results from sub-flows within the main flow
Please let me know if there is a better way to create parallel sub-flows. I'm happy to submit an issue, but I wanted to ask here in case I'm missing something obvious! 😉Richard Alexander
05/31/2024, 1:25 PMDev Dabke
05/31/2024, 2:20 PMDev Dabke
05/31/2024, 2:21 PMRichard Alexander
05/31/2024, 2:23 PMRichard Alexander
05/31/2024, 2:24 PMrun_deployment
to create a subflow?Dev Dabke
05/31/2024, 2:30 PMrun_deployment
under the hoodDev Dabke
05/31/2024, 2:30 PMrun_deployment
Dev Dabke
05/31/2024, 2:30 PMRichard Alexander
05/31/2024, 2:41 PMasyncio.run
. That seems to have been the culprit.
If I start the main flow via run_deployment
as well, then the graph shows up.
Thanks for the quick reply Dev, I appreciate it!!! 😄Dev Dabke
05/31/2024, 2:42 PMasyncio
trips me up at least once a week 🙈Dev Dabke
05/31/2024, 2:42 PMRichard Alexander
05/31/2024, 2:45 PMrun_deployment
subflows? How do you deal with those issues?Dev Dabke
05/31/2024, 4:07 PMDev Dabke
05/31/2024, 4:07 PMDev Dabke
05/31/2024, 4:07 PMmerlin
06/06/2024, 9:41 PMrun_deployment
workaround to manage parallel flows with subflows. Its going pretty well with a couple of pain points.
I initiate from a deployment which calls many other deployments as subflows:
@flow
async def run_many(deployment_list):
async with asyncio.TaskGroup() as tg:
flow_runs = []
for deployment in deployment_list:
task = tg.create_task(
run_deployment(
name=deployment.get('slug')
, parameters=deployment.get('params')
, timeout=3600
)
)
await asyncio.sleep(1)
flow_runs.append(task)
# this is important later
final_states = [x.result().state.type._value_ for x in flow_runs]
return check_bulk_states(final_states)
I'm able to call this function without an await
like this:
result = run_many(my_list_of_deployments)
So, yes the subflows which are flow-runs of the deployed flows are detached in a sense from the outer flow, but with timeout
it will wait for the result to be returned.
The check_bulk_states
function is how I determine success/failure status, as you mentioned the outer flow will succeed if the subflows dont timeout even if they fail:
def check_bulk_states(states_list):
"""
TaskGroup flows will succeed regardless of end state for subflow async deployment runs. This function returns a Failed state to be passed back from the flow.
"""
logger = get_run_logger()
if not all(x == "COMPLETED" for x in states_list):
logger.error(f"Check final states: {states_list}")
return Failed(message="State check FAILED: one or more jobs failed.")
else:
<http://logger.info|logger.info>(f"Check final states: {states_list}")
<http://logger.info|logger.info>("SUCCESS: all jobs complete.")
return states_list
merlin
06/06/2024, 9:46 PMgit clone
for every single one. If I am running a SQL DDL backfill for instance, I can have 1000s of git clone calls and they hit this error: IOError: [Errno 24] Too many open files
. I'd prefer to call a deployment that pulls the code once, and then runs a parallel subflow from there instead.merlin
06/06/2024, 9:50 PMrun_deployment
workaround.
Also, the reason I used run_deployment
workaround was because of this now-closed issue, so perhaps concurrent identical subflows (with different parameters) will run properly now.Richard Alexander
06/07/2024, 7:49 PMfrom prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
@flow
async def main_flow():
sub_flows = []
sub_flows.append(
await run_deployment('flow_name/deployment_name',
parameters=params,
timeout=0
)
)
# ADD MORE SUBFLOWS HERE...
for flow_run in sub_flows:
await wait_for_flow_run(flow_run.id, poll_interval=5)
Richard Alexander
06/07/2024, 7:52 PMtimeout=0
in the run_deployment arguments and then wait_for_flow_run()
. This avoids the need to explicitly call asyncio, but I can also see advantages to using your method in certain situations.Richard Alexander
06/07/2024, 7:54 PMmerlin
06/07/2024, 11:08 PMYaron Levi
07/06/2024, 2:15 PMmerlin
07/06/2024, 8:06 PMasync def
, on and on. The thing I need to optimize is the large number of jobs with interspersed dependencies, performance tuning is not a factor at all. I just have work to do and a lot of people depending on my output and I need to scale.
Meanwhile concurrent parametrized flows has been a head scratcher for a while, and for a long time calling run_deployment
for each was the official workaround.Yaron Levi
07/06/2024, 9:23 PMYaron Levi
07/06/2024, 9:23 PMYaron Levi
07/06/2024, 9:25 PMYaron Levi
07/06/2024, 9:26 PMYaron Levi
07/06/2024, 9:27 PMYaron Levi
07/06/2024, 9:29 PMmerlin
07/07/2024, 7:41 AMYaron Levi
07/07/2024, 9:06 AMBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by