Prefect 2.4.5 I have a parent_flow that is regist...
# ask-community
s
Prefect 2.4.5 I have a parent_flow that is registered to a deployment in Orion. I am using a kubernetes block to run this parent_flow as a k8s job. Within that parent_flow I have two sub flows, sub_flow1 and sub_flow2. Both of these sub flows are also registered to their own deployments that run as k8s jobs. I am triggering the subflows using client.create_flow_run_from_deployment(**). Is there a way that I can pass the results from sub_flow1 into sub_flow2 here? And, can I somehow wait to start sub_flow2 until sub_flow1 is complete? The reason I have the sub flows built as their own deployments is so they can run using different resources, maybe one needs a GPU and the other does not. Here is some code for what I am describing:
Copy code
import asyncio
from prefect.client import get_client
from prefect import flow, task

@task
async def child_task1():
    async with get_client() as client:
        deployment = await client.read_deployment_by_name(name='sub_flow1/sub_flow1')
        response = await client.create_flow_run_from_deployment(deployment.id)

@task
async def child_task2():
    async with get_client() as client:
        deployment = await client.read_deployment_by_name(name='sub_flow2/sub_flow2')
        response = await client.create_flow_run_from_deployment(deployment.id)

@flow
async def parent_flow():
    await child_task1.submit()
    await child_task2.submit()

if __name__ == "__main__":
    asyncio.run(parent_flow())
1
a
subflows are blocking and thus waiting for completion by default. - this post might help clarify https://medium.com/the-prefect-blog/modular-data-stack-build-a-data-platform-with-prefect-dbt-and-snowflake-part-7-95f908d29a01
gratitude thank you 1
s
This is exactly what I needed! When using
run_deployment
, is there anyway to get results back from the the deployment? I see
state
in the output, but when I try to get .result() from it I see
MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
a
perhaps try:
Copy code
prefect config set PREFECT_RESULTS_PERSIST_BY_DEFAULT='True'
s
Where might that need to be set... Orion? agent? k8s job? locally where I run
run_deployment
?
a
good Q you set it from where you execute this, so you could attach this to your infrastructure block (every block has env vars) or before you start the agent, or as part of agent env vars
s
Thanks @Anna Geller! To wrap up, I resolved this by 1. setting
PREFECT_RESULTS_PERSIST_BY_DEFAULT='True'
in the agent 2. creating a new block for result storage 3. pointing my flow to that storage
@flow(result_storage='gcs/<my-block>')
Copy code
a = run_deployment('<my-flow>/<my-deployment')
a.state.result()
🙌 1
gratitude thank you 1