Blake Stefansen
05/11/2023, 6:36 PMrun_deployment()
. I will await
for all of these flow runs to complete, at which point I check if all of the futures have a state of "Completed".
However, this deployment I'm running also creates a sublow just before the parent flow completes, and I would like to check if the state of the subflows are complete as well. The future ( parent flow ) returns a state when the parent flow completes, but I have no way to check if the subflow completes
QUESTION
Is there a way to use the response of the run_deployment()
method to check if a subflow is Complete? How can I programmatically get the state of a sublow if a subflow is created with run_deployment()
?from test_broadband_portal_backend_config import params
@pytest.fixture
def input_data_params():
return params
def create_prefect_deployment_run(input_data):
submission_key = input_data["input"]["submission_key"]
s3_filepath = input_data["input"]["s3_filepath"]
submission_flow_params = {
"submission_key": submission_key,
"s3_filepath": s3_filepath
}
# Remove the "S3://" prefix
s3_filepath = input_data["input"]["s3_filepath"].replace("S3://", "")
# Extract the object name using os.path.basename
object_name = os.path.basename(s3_filepath)
response = run_deployment(
name='isp-portal-submission/isp_portal_submission_yaml_deploy_k8',
parameters=submission_flow_params,
flow_run_name=object_name,
timeout=300
)
return response
class TestPrefectPipeline:
ss_candy = Secret.load("spatial-stream-candy").get()
@pytest.mark.asyncio
async def test_submission_flow_completes(self, input_data_params):
flows = []
for input_data in input_data_params:
flow = create_prefect_deployment_run(input_data)
flows.append(flow)
responses = await asyncio.gather(*flows)
for response in responses:
assert response.state_name == "Completed"
Nate
05/11/2023, 6:57 PMthis deployment I'm running also creates a subflow once the flow completesa subflow can only be created while its parent flow is running, since a subflow is a just a flow called from a running flow but it seems like if you returned the result of
run_deployment
(as called from from the parent flow) which is a FlowRun
object, you could grab that result and check the state that's stored for that flow run in the resultBlake Stefansen
05/11/2023, 7:03 PMthis deployment I'm running also creates a sublow just before the parent flow completes
run_deployment()
) will also use run_deploymen()
with a timeout of 0 at the end of the flow.
AKA a parent deployment run is creating a child deployment run
I'm thinking maybe I just need to increase the timeout time for the child deployment run AKA have the parent flow complete once the child flow completes?Nate
05/11/2023, 7:19 PMtimeout=0
in the parent?Blake Stefansen
05/11/2023, 7:21 PMsubflow_id
that I can see)Nate
05/11/2023, 7:24 PMbut I don't think thats possible?I believe you're correct bc if you do
timeout=0
when calling run_deployment
in the parent, it won't (can't) return a FlowRun
that represents has finished state of the child run, since the parent would have to wait for the child to finish to report the FlowRun
back to the for loop's contextFlowRun
that run_deployment(..., timeout=0)
gives you in order to later fetch the child flow result from the API laterBlake Stefansen
05/11/2023, 7:28 PMNate
05/11/2023, 7:29 PM