https://prefect.io logo
Title
b

Blake Stefansen

05/11/2023, 6:36 PM
Hey everyone, CONTEXT So I have a for loop that creates a bunch of response futures using
run_deployment()
. I will
await
for all of these flow runs to complete, at which point I check if all of the futures have a state of "Completed". However, this deployment I'm running also creates a sublow just before the parent flow completes, and I would like to check if the state of the subflows are complete as well. The future ( parent flow ) returns a state when the parent flow completes, but I have no way to check if the subflow completes QUESTION Is there a way to use the response of the
run_deployment()
method to check if a subflow is Complete? How can I programmatically get the state of a sublow if a subflow is created with
run_deployment()
?
from test_broadband_portal_backend_config import params

@pytest.fixture
def input_data_params():
    return params

def create_prefect_deployment_run(input_data):
    submission_key = input_data["input"]["submission_key"]
    s3_filepath = input_data["input"]["s3_filepath"]
    submission_flow_params = {
        "submission_key": submission_key,
        "s3_filepath": s3_filepath
    }

    # Remove the "S3://" prefix
    s3_filepath = input_data["input"]["s3_filepath"].replace("S3://", "")

    # Extract the object name using os.path.basename
    object_name = os.path.basename(s3_filepath)

    response = run_deployment(
        name='isp-portal-submission/isp_portal_submission_yaml_deploy_k8',
        parameters=submission_flow_params,
        flow_run_name=object_name,
        timeout=300
    )
    return response


class TestPrefectPipeline:
    ss_candy = Secret.load("spatial-stream-candy").get()

    @pytest.mark.asyncio
    async def test_submission_flow_completes(self, input_data_params):
        flows = []
        for input_data in input_data_params:
            flow = create_prefect_deployment_run(input_data)
            flows.append(flow)

        responses = await asyncio.gather(*flows)
        for response in responses:
            assert response.state_name == "Completed"
I may have to just store the flow ids for all of the parent flow futures, use those to get the flow ids for the subflows, and then use the REST API to continuously poll the flow state for a specified amount of time https://app.prefect.cloud/api/docs#tag/Flow-Run-States
n

Nate

05/11/2023, 6:57 PM
hi @Blake Stefansen im not sure I understand what is meant by (i might be misunderstanding what you're saying)
this deployment I'm running also creates a subflow once the flow completes
a subflow can only be created while its parent flow is running, since a subflow is a just a flow called from a running flow but it seems like if you returned the result of
run_deployment
(as called from from the parent flow) which is a
FlowRun
object, you could grab that result and check the state that's stored for that flow run in the result
so a bit of a combo of the links above
b

Blake Stefansen

05/11/2023, 7:03 PM
Thanks @Nate I will take a look at those Sorry, I meant to say
this deployment I'm running also creates a sublow just before the parent flow completes
@Nate sorry if im using the wrong terminology. so my parent flow ( which is created with
run_deployment()
) will also use
run_deploymen()
with a timeout of 0 at the end of the flow. AKA a parent deployment run is creating a child deployment run I'm thinking maybe I just need to increase the timeout time for the child deployment run AKA have the parent flow complete once the child flow completes?
n

Nate

05/11/2023, 7:19 PM
gotcha - so where do you want to be able to check that a child deployment run completed? in the for loop where the parent is triggered or within the parent i assume you want to check that the child completed from the for loop because you're using
timeout=0
in the parent?
b

Blake Stefansen

05/11/2023, 7:21 PM
ideally in the for loop where im creating the parent deployment runs, but I don't think thats possible? (because the child deployment run has timeout of 0, and the future for the parent doesn't contain anything like
subflow_id
that I can see)
well, I could just use the parent flow ids to get the subflow ids via the REST api probably, but maybe thats not the best approach
n

Nate

05/11/2023, 7:24 PM
but I don't think thats possible?
I believe you're correct bc if you do
timeout=0
when calling
run_deployment
in the parent, it won't (can't) return a
FlowRun
that represents has finished state of the child run, since the parent would have to wait for the child to finish to report the
FlowRun
back to the for loop's context
👍 1
but yeah you should still be able to return the
FlowRun
that
run_deployment(..., timeout=0)
gives you in order to later fetch the child flow result from the API later
b

Blake Stefansen

05/11/2023, 7:28 PM
ok cool, thank you for helping me talk this through!
n

Nate

05/11/2023, 7:29 PM
sure thing!