<@ULVA73B9P> I started the deployment using clien...
# marvin-in-the-wild
p
@Marvin I started the deployment using client.create_flow_run_from_deployment and running a function. Now I want to get the generated flow ID ? How it works?
m
To retrieve the generated Flow ID after creating a flow run from a deployment using
client.create_flow_run_from_deployment
, you can use the response from this method directly. It returns a
FlowRun
object that includes the
id
attribute. Here is a simple example:
Copy code
python
from prefect.client import get_client

async def create_flow_run_and_get_id(deployment_id: str):
    async with get_client() as client:
        flow_run = await client.create_flow_run_from_deployment(deployment_id)
        flow_run_id = flow_run.id
        return flow_run_id

# Example usage
import asyncio
deployment_id = "your-deployment-id"
flow_run_id = asyncio.run(create_flow_run_and_get_id(deployment_id))
print(f"The generated Flow ID is: {flow_run_id}")
In this example: 1.
get_client()
obtains a client to interact with the Prefect API. 2.
client.create_flow_run_from_deployment(deployment_id)
creates a flow run from the specified deployment and returns a
FlowRun
object. 3.
flow_run.id
extracts the Flow ID from the returned object. You can find more details in the Prefect documentation. For any additional issues related to this, you might find related discussions and resolutions on GitHub here.
p
@Marvin What are the other methods ?
m
In addition to
create_flow_run_from_deployment
, the
PrefectClient
provides several other useful methods for interacting with flow runs and deployments. Here are a few notable ones: ### 1. reschedule_late_flow_runs This method can be used to bulk reschedule flow runs that are late. This example reschedules the last 3 late flow runs of a deployment to run 6 hours later than their original expected start time:
Copy code
python
import asyncio
from datetime import datetime, timedelta, timezone
from prefect import get_client
from prefect.client.schemas import FlowRunFilter, DeploymentFilter, FlowRunSort

async def reschedule_late_flow_runs(deployment_name: str, delay: timedelta, most_recent_n: int, delete_remaining: bool = True, states: Optional[list[str]] = None) -> list[FlowRun]:
    if not states:
        states = ["Late"]
    
    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=dict(name=dict(any_=states)),
                expected_start_time=dict(before_=datetime.now(timezone.utc)),
            ),
            deployment_filter=DeploymentFilter(name={'like_': deployment_name}),
            sort=FlowRunSort.START_TIME_DESC,
            limit=most_recent_n if not delete_remaining else None
        )
        
        if not flow_runs:
            print(f"No flow runs found in states: {states!r}")
            return []
        
        rescheduled_flow_runs = []
        for i, run in enumerate(flow_runs):
            await client.delete_flow_run(flow_run_id=run.id)
            if i < most_recent_n:
                new_run = await client.create_flow_run_from_deployment(deployment_id=run.deployment_id)
                rescheduled_flow_runs.append(new_run)
        
        return rescheduled_flow_runs

# Usage example
deployment_name = "healthcheck-storage-test"
delay = timedelta(hours=6)
most_recent_n = 3
rescheduled_runs = asyncio.run(reschedule_late_flow_runs(deployment_name, delay, most_recent_n))
### 2. read_flow_runs Use this method to retrieve details about existing flow runs based on various filters:
Copy code
python
async with get_client() as client:
    flow_runs = await client.read_flow_runs(
        flow_run_filter=FlowRunFilter(
            state=dict(name=dict(any_=["Completed", "Failed"]))
        )
    )
    for flow_run in flow_runs:
        print(flow_run)
### 3. delete_flow_run Delete a specific flow run by ID:
Copy code
python
async with get_client() as client:
    await client.delete_flow_run(flow_run_id="your-flow-run-id")
### 4. create_flow_run Directly create a flow run for an arbitrary flow without using a deployment:
Copy code
python
async with get_client() as client:
    flow_run = await client.create_flow_run(flow_id="your-flow-id")
    print(f"Created flow run with ID: {flow_run.id}")
For more detailed information and examples, you can refer to the Prefect documentation. By utilizing these various client methods, you can manage and manipulate flow runs in a multitude of ways to suit your orchestration needs.