Srul Pinkas
09/10/2024, 4:50 PMMarvin
09/10/2024, 4:50 PMSrul Pinkas
09/10/2024, 4:51 PMMarvin
09/10/2024, 5:16 PMpython
from prefect import flow, task
@task
def print_message(msg):
print(msg)
@flow
def child_flow(name):
print_message(f"Hello {name}!")
@flow
def parent_flow():
child_flow("World")
if __name__ == "__main__":
parent_flow()
2. Flow Relationships:
- Subflows mirror the state of child flow runs and behave like normal flow runs.
- A sub-flow can be identified in the backend via a parent_task_run_id
.
3. Limitations:
- Inline subflow runs cannot be canceled without canceling their parent flow run.
For more detailed information, you can refer to Prefect 2.x documentation on flows and subflows:
- Prefect Flows Concepts
- Prefect Deployments Concepts
Feel free to ask if you need further clarification.Srul Pinkas
09/10/2024, 5:41 PMMarvin
09/10/2024, 5:41 PMrun_deployment
. However, inline sub-flows run within the same context as the parent flow, so the resources would be shared.
For triggering sub-flows on separate workers, you'd need to use Prefect's deployment mechanism. Here's an example using deployments:
1. Define the Sub-flow and Deploy It:
python
from prefect import flow
@flow
def child_flow(name):
print(f"Hello {name}!")
Deploy child_flow
separately, e.g., using the CLI:
bash
prefect deployment build -n child-flow-deployment child_flow.py:child_flow
2. Run Deployment in Parent Flow:
python
from prefect import flow
from prefect.deployments import run_deployment
@flow
def parent_flow():
run_deployment(name="child-flow-deployment/child-flow-deployment")
if __name__ == "__main__":
parent_flow()
This approach allows child_flow
to run on separate workers from parent_flow
. The deployment mechanism enables the distribution of work across different resources.
For further insights and advanced options, check the documentation:
- Running a Deployment from Code
If you encounter any issues or need more details, feel free to ask!Srul Pinkas
09/11/2024, 4:36 PMMarvin
09/11/2024, 4:36 PMcreated
date to get the latest.
3. Check Deployment Status: Filter the deployments to find the latest successful one.
Here's a sample script:
python
from prefect.client import get_client
from prefect.orion.schemas.states import StateType
async def get_latest_successful_deployment_id(flow_name):
async with get_client() as client:
# Get all deployments for the flow
deployments = await client.read_deployments(flow_name=flow_name, sort={"created": -1})
# Filter for the latest successful deployment
for deployment in deployments:
runs = await client.read_flow_runs(deployment_id=deployment.id, limit=1, sort={"updated": -1})
if runs and runs[0].state.type == StateType.COMPLETED:
return deployment.id
return None
# Usage
import asyncio
flow_name = "your-flow-name"
deployment_id = asyncio.run(get_latest_successful_deployment_id(flow_name))
print(f"Latest successful deployment ID: {deployment_id}")
This script asynchronously queries the Prefect API to find the latest successful deployment for a given flow name.
Refer to the Prefect documentation for more details on querying deployments and flow runs:
- Prefect API Client
If you have any further questions, feel free to ask!