Ponraj
06/21/2024, 3:13 PMMarvin
06/21/2024, 3:13 PMclient.create_flow_run_from_deployment
, you can use the response from this method directly. It returns a FlowRun
object that includes the id
attribute.
Here is a simple example:
python
from prefect.client import get_client
async def create_flow_run_and_get_id(deployment_id: str):
async with get_client() as client:
flow_run = await client.create_flow_run_from_deployment(deployment_id)
flow_run_id = flow_run.id
return flow_run_id
# Example usage
import asyncio
deployment_id = "your-deployment-id"
flow_run_id = asyncio.run(create_flow_run_and_get_id(deployment_id))
print(f"The generated Flow ID is: {flow_run_id}")
In this example:
1. get_client()
obtains a client to interact with the Prefect API.
2. client.create_flow_run_from_deployment(deployment_id)
creates a flow run from the specified deployment and returns a FlowRun
object.
3. flow_run.id
extracts the Flow ID from the returned object.
You can find more details in the Prefect documentation.
For any additional issues related to this, you might find related discussions and resolutions on GitHub here.Ponraj
06/21/2024, 3:17 PMMarvin
06/21/2024, 3:17 PMcreate_flow_run_from_deployment
, the PrefectClient
provides several other useful methods for interacting with flow runs and deployments. Here are a few notable ones:
### 1. reschedule_late_flow_runs
This method can be used to bulk reschedule flow runs that are late. This example reschedules the last 3 late flow runs of a deployment to run 6 hours later than their original expected start time:
python
import asyncio
from datetime import datetime, timedelta, timezone
from prefect import get_client
from prefect.client.schemas import FlowRunFilter, DeploymentFilter, FlowRunSort
async def reschedule_late_flow_runs(deployment_name: str, delay: timedelta, most_recent_n: int, delete_remaining: bool = True, states: Optional[list[str]] = None) -> list[FlowRun]:
if not states:
states = ["Late"]
async with get_client() as client:
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
state=dict(name=dict(any_=states)),
expected_start_time=dict(before_=datetime.now(timezone.utc)),
),
deployment_filter=DeploymentFilter(name={'like_': deployment_name}),
sort=FlowRunSort.START_TIME_DESC,
limit=most_recent_n if not delete_remaining else None
)
if not flow_runs:
print(f"No flow runs found in states: {states!r}")
return []
rescheduled_flow_runs = []
for i, run in enumerate(flow_runs):
await client.delete_flow_run(flow_run_id=run.id)
if i < most_recent_n:
new_run = await client.create_flow_run_from_deployment(deployment_id=run.deployment_id)
rescheduled_flow_runs.append(new_run)
return rescheduled_flow_runs
# Usage example
deployment_name = "healthcheck-storage-test"
delay = timedelta(hours=6)
most_recent_n = 3
rescheduled_runs = asyncio.run(reschedule_late_flow_runs(deployment_name, delay, most_recent_n))
### 2. read_flow_runs
Use this method to retrieve details about existing flow runs based on various filters:
python
async with get_client() as client:
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
state=dict(name=dict(any_=["Completed", "Failed"]))
)
)
for flow_run in flow_runs:
print(flow_run)
### 3. delete_flow_run
Delete a specific flow run by ID:
python
async with get_client() as client:
await client.delete_flow_run(flow_run_id="your-flow-run-id")
### 4. create_flow_run
Directly create a flow run for an arbitrary flow without using a deployment:
python
async with get_client() as client:
flow_run = await client.create_flow_run(flow_id="your-flow-id")
print(f"Created flow run with ID: {flow_run.id}")
For more detailed information and examples, you can refer to the Prefect documentation.
By utilizing these various client methods, you can manage and manipulate flow runs in a multitude of ways to suit your orchestration needs.