Dominic Tarro
05/06/2023, 12:04 PMDominic Tarro
05/06/2023, 11:29 PMfrom prefect import Flow
from prefect.cli.deployment import get_deployment
from prefect.client.orchestration import get_client
from prefect.engine import FlowRun
from prefect.server.schemas.responses import DeploymentResponse
from prefect.states import Scheduled, State
async def trigger_flow_run_on_state(flow: Flow, flow_run: FlowRun, state: State):
"""Triggers a deployment to run via Prefect `on_completion`, `on_failure`, or `on_crashed`."""
# Get the client to communicate with the Prefect server
async with get_client() as client:
# Get the deployment to run.
# TODO Update the {flow.name} and {deployment.name} to match your deployment.
deployment: DeploymentResponse = await get_deployment(
client, name="{flow.name}/{deployment.name}", deployment_id=None
)
# Submits the deployment to be run immediately.
# Returns the FlowRun object.
hooked_flow_run: FlowRun = await client.create_flow_run_from_deployment(
deployment.id,
state=Scheduled(),
)