Hi <@ULVA73B9P>, is there a way to run a sub-flow ...
# marvin-in-the-wild
s
Hi @Marvin, is there a way to run a sub-flow as part of a flow where all i have is its name and not deployment? i need to split my flows to parent and child and don't want to handle too many deployment issues and variables management - can the subflow triggereing be as simple as flow-name found in code (take latest available)?
m
So you want to trigger a sub-flow using just the flow name without dealing with deployment complexities, right? Let me search the Prefect docs for something that matches your requirements. First, which version of Prefect are you using?
s
prefect 2.20 @Marvin
m
You can run a sub-flow within a parent flow by simply calling the sub-flow function within the parent flow function. This inline approach allows you to define multiple flows in the same file and execute one as part of another flow execution without needing separate deployments. Here are the key points: 1. Calling a Sub-flow: Just call the sub-flow function within your parent flow.
Copy code
python
   from prefect import flow, task

   @task
   def print_message(msg):
       print(msg)
   
   @flow
   def child_flow(name):
       print_message(f"Hello {name}!")
   
   @flow
   def parent_flow():
       child_flow("World")
   
   if __name__ == "__main__":
       parent_flow()
2. Flow Relationships: - Subflows mirror the state of child flow runs and behave like normal flow runs. - A sub-flow can be identified in the backend via a
parent_task_run_id
. 3. Limitations: - Inline subflow runs cannot be canceled without canceling their parent flow run. For more detailed information, you can refer to Prefect 2.x documentation on flows and subflows: - Prefect Flows Concepts - Prefect Deployments Concepts Feel free to ask if you need further clarification.
s
Can i use this triggering pattern to run on external resources to the parent flow (e.g separate workers) @Marvin? Or is it limited to parent workers only?
m
To run sub-flows on external resources separate from the parent flow's resources, you typically need to create deployments and use
run_deployment
. However, inline sub-flows run within the same context as the parent flow, so the resources would be shared. For triggering sub-flows on separate workers, you'd need to use Prefect's deployment mechanism. Here's an example using deployments: 1. Define the Sub-flow and Deploy It:
Copy code
python
   from prefect import flow

   @flow
   def child_flow(name):
       print(f"Hello {name}!")
Deploy
child_flow
separately, e.g., using the CLI:
Copy code
bash
   prefect deployment build -n child-flow-deployment child_flow.py:child_flow
2. Run Deployment in Parent Flow:
Copy code
python
   from prefect import flow
   from prefect.deployments import run_deployment

   @flow
   def parent_flow():
       run_deployment(name="child-flow-deployment/child-flow-deployment")
   
   if __name__ == "__main__":
       parent_flow()
This approach allows
child_flow
to run on separate workers from
parent_flow
. The deployment mechanism enables the distribution of work across different resources. For further insights and advanced options, check the documentation: - Running a Deployment from Code If you encounter any issues or need more details, feel free to ask!
s
@Marvin is there a way to get the latest successful deployment id of a given flow, by flow-name, in API? (still using prefect 2.20)
m
Yes, you can use the Prefect API to get the latest successful deployment ID of a given flow by its name. Here's a step-by-step guide to accomplish this: 1. Use the Prefect Client: First, instantiate the Prefect client in your script. 2. Query for Deployments: Use the client to query for deployments by flow name and sort them by
created
date to get the latest. 3. Check Deployment Status: Filter the deployments to find the latest successful one. Here's a sample script:
Copy code
python
from prefect.client import get_client
from prefect.orion.schemas.states import StateType

async def get_latest_successful_deployment_id(flow_name):
    async with get_client() as client:
        # Get all deployments for the flow
        deployments = await client.read_deployments(flow_name=flow_name, sort={"created": -1})
        
        # Filter for the latest successful deployment
        for deployment in deployments:
            runs = await client.read_flow_runs(deployment_id=deployment.id, limit=1, sort={"updated": -1})
            if runs and runs[0].state.type == StateType.COMPLETED:
                return deployment.id

        return None

# Usage
import asyncio
flow_name = "your-flow-name"
deployment_id = asyncio.run(get_latest_successful_deployment_id(flow_name))
print(f"Latest successful deployment ID: {deployment_id}")
This script asynchronously queries the Prefect API to find the latest successful deployment for a given flow name. Refer to the Prefect documentation for more details on querying deployments and flow runs: - Prefect API Client If you have any further questions, feel free to ask!