https://prefect.io logo
Title
s

Scott Walsh

11/17/2022, 12:15 AM
Prefect 2.4.5 I have a parent_flow that is registered to a deployment in Orion. I am using a kubernetes block to run this parent_flow as a k8s job. Within that parent_flow I have two sub flows, sub_flow1 and sub_flow2. Both of these sub flows are also registered to their own deployments that run as k8s jobs. I am triggering the subflows using client.create_flow_run_from_deployment(**). Is there a way that I can pass the results from sub_flow1 into sub_flow2 here? And, can I somehow wait to start sub_flow2 until sub_flow1 is complete? The reason I have the sub flows built as their own deployments is so they can run using different resources, maybe one needs a GPU and the other does not. Here is some code for what I am describing:
import asyncio
from prefect.client import get_client
from prefect import flow, task

@task
async def child_task1():
    async with get_client() as client:
        deployment = await client.read_deployment_by_name(name='sub_flow1/sub_flow1')
        response = await client.create_flow_run_from_deployment(deployment.id)

@task
async def child_task2():
    async with get_client() as client:
        deployment = await client.read_deployment_by_name(name='sub_flow2/sub_flow2')
        response = await client.create_flow_run_from_deployment(deployment.id)

@flow
async def parent_flow():
    await child_task1.submit()
    await child_task2.submit()

if __name__ == "__main__":
    asyncio.run(parent_flow())
1
a

Anna Geller

11/17/2022, 12:16 AM
subflows are blocking and thus waiting for completion by default. - this post might help clarify https://medium.com/the-prefect-blog/modular-data-stack-build-a-data-platform-with-prefect-dbt-and-snowflake-part-7-95f908d29a01
:gratitude-thank-you: 1
s

Scott Walsh

11/17/2022, 7:21 PM
This is exactly what I needed! When using
run_deployment
, is there anyway to get results back from the the deployment? I see
state
in the output, but when I try to get .result() from it I see
MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
a

Anna Geller

11/17/2022, 7:22 PM
perhaps try:
prefect config set PREFECT_RESULTS_PERSIST_BY_DEFAULT='True'
s

Scott Walsh

11/17/2022, 7:32 PM
Where might that need to be set... Orion? agent? k8s job? locally where I run
run_deployment
?
a

Anna Geller

11/17/2022, 10:19 PM
good Q you set it from where you execute this, so you could attach this to your infrastructure block (every block has env vars) or before you start the agent, or as part of agent env vars
s

Scott Walsh

11/18/2022, 8:13 PM
Thanks @Anna Geller! To wrap up, I resolved this by 1. setting
PREFECT_RESULTS_PERSIST_BY_DEFAULT='True'
in the agent 2. creating a new block for result storage 3. pointing my flow to that storage
@flow(result_storage='gcs/<my-block>')
a = run_deployment('<my-flow>/<my-deployment')
a.state.result()
🙌 1
:gratitude-thank-you: 1