Muktesh
03/14/2025, 12:46 AMsub_flows = []
sub_flows.extend(
[
await run_deployment(
"pipeline-subflow/subflow-deployment",
parameters={
"params": SubflowInput(
project=params.project,
id=params.id,
filename=params.filename,
metadata=metadata_results.metadata,
runtime_context=runtime_context,
).model_dump(),
},
work_queue_name="processing-subflows",
timeout=0,
)
for segment in results.segments
]
)
results = list[SubflowOutput]()
futures = [await wait_for_flow_run(flow_run.id) for flow_run in sub_flows]
for future in futures:
result = await future.state.result(fetch=True, raise_on_failure=True)
results.append(SubflowOutput(**result))
However, I am noticing failures for certain flow runs. Interestingly all the tasks and subflows for the said flow are in success state. Here is the error:
File "/usr/local/lib/python3.12/site-packages/prefect/flow_runs.py", line 153, in wait_for_flow_run
raise FlowRunWaitTimeout(
prefect.exceptions.FlowRunWaitTimeout: Flow run with ID 9527c2b3-1017-4972-88b5-3c99db6e7392 exceeded watch timeout of 10800 seconds
Any help to resolve this would be appreciated. Thanks!Muktesh
03/14/2025, 12:49 AM