<@ULVA73B9P> can you give me a code snippet to get...
# ask-marvin
l
@Marvin can you give me a code snippet to get the flow runs using client sdk that are running based on deployment name?
m
thought for 85.3 seconds
Based on my research and the module structure, I can now provide you with a correct code snippet to query flow runs by deployment name using the Prefect Client SDK. Here's how to do it:
Copy code
python
from prefect.client import get_client
from prefect.client.schemas.filters import FlowRunFilter, DeploymentFilter, DeploymentFilterName

async def get_running_flow_runs(deployment_name: str):
    # Create an async client
    async with get_client() as client:
        # Create a deployment filter for the specific deployment name
        deployment_filter = DeploymentFilter(
            name=DeploymentFilterName(any_=[deployment_name])
        )
        
        # Query flow runs with the deployment filter
        flow_runs = await client.read_flow_runs(
            deployment_filter=deployment_filter,
            # Only get runs that are in a running state
            flow_run_filter=FlowRunFilter(
                state={"type": {"any_": ["RUNNING"]}}
            )
        )
        
        return flow_runs

# Usage example:
import asyncio

async def main():
    flow_runs = await get_running_flow_runs("your-deployment-name")
    for run in flow_runs:
        print(f"Flow Run ID: {run.id}")
        print(f"State: {run.state.type}")
        print(f"Start Time: {run.start_time}")
        print("---")

# Run the async function
asyncio.run(main())
This code will: 1. Create a client connection 2. Filter flow runs by the specified deployment name 3. Further filter to only show running flow runs 4. Return the matching flow runs A few important notes: - This is an async implementation, which is recommended for Prefect 3.x - The code uses the
get_client()
function which will automatically handle the connection to your Prefect server - The
DeploymentFilter
and
FlowRunFilter
allow you to specify exactly what you're looking for - You can modify the state filter to look for other states if needed (e.g., "PENDING", "COMPLETED", "FAILED") If you need a synchronous version instead, you can use the synchronous client:
Copy code
python
from prefect.client import get_client

def get_running_flow_runs(deployment_name: str):
    with get_client() as client:
        deployment_filter = DeploymentFilter(
            name=DeploymentFilterName(any_=[deployment_name])
        )
        
        flow_runs = client.read_flow_runs(
            deployment_filter=deployment_filter,
            flow_run_filter=FlowRunFilter(
                state={"type": {"any_": ["RUNNING"]}}
            )
        )
        
        return flow_runs
Both versions will achieve the same result, but the async version is generally recommended for better performance, especially when making multiple API calls.
l
@Marvin can you add a limit here marvin to only give one result
m
thought for 9.6 seconds
Ah, limiting results to avoid being overwhelmed by data. A very sensible request. Let me modify the code to return just one result.
Copy code
python
from prefect.client import get_client
from prefect.client.schemas.filters import FlowRunFilter, DeploymentFilter, DeploymentFilterName

async def get_running_flow_runs(deployment_name: str):
    # Create an async client
    async with get_client() as client:
        # Create a deployment filter for the specific deployment name
        deployment_filter = DeploymentFilter(
            name=DeploymentFilterName(any_=[deployment_name])
        )
        
        # Query flow runs with the deployment filter and limit=1
        flow_runs = await client.read_flow_runs(
            deployment_filter=deployment_filter,
            flow_run_filter=FlowRunFilter(
                state={"type": {"any_": ["RUNNING"]}}
            ),
            limit=1  # This will return only one result
        )
        
        return flow_runs

# Usage example:
import asyncio

async def main():
    flow_runs = await get_running_flow_runs("your-deployment-name")
    if flow_runs:
        run = flow_runs[0]  # Get the single result
        print(f"Flow Run ID: {run.id}")
        print(f"State: {run.state.type}")
        print(f"Start Time: {run.start_time}")
    else:
        print("No running flow runs found for this deployment")

# Run the async function
asyncio.run(main())
And here's the synchronous version if you prefer:
Copy code
python
from prefect.client import get_client
from prefect.client.schemas.filters import FlowRunFilter, DeploymentFilter, DeploymentFilterName

def get_running_flow_runs(deployment_name: str):
    with get_client() as client:
        deployment_filter = DeploymentFilter(
            name=DeploymentFilterName(any_=[deployment_name])
        )
        
        flow_runs = client.read_flow_runs(
            deployment_filter=deployment_filter,
            flow_run_filter=FlowRunFilter(
                state={"type": {"any_": ["RUNNING"]}}
            ),
            limit=1  # This will return only one result
        )
        
        return flow_runs
The key change is the addition of
limit=1
in the
read_flow_runs
call. This ensures you get only the most recent matching flow run instead of all of them. Much more efficient when you only need the latest information, rather like knowing where your towel is - you really only need to know about one towel at a time.
l
@Marvin Can you also extract the url of the flow from the prefect server?
m
thought for 86.9 seconds
Based on my research, I can modify the previous code to include the UI URL for the flow run. Here's the updated version:
Copy code
python
from prefect.client import get_client
from prefect.client.schemas.filters import FlowRunFilter, DeploymentFilter, DeploymentFilterName

async def get_running_flow_runs(deployment_name: str):
    # Create an async client
    async with get_client() as client:
        # Create a deployment filter for the specific deployment name
        deployment_filter = DeploymentFilter(
            name=DeploymentFilterName(any_=[deployment_name])
        )
        
        # Query flow runs with the deployment filter and limit=1
        flow_runs = await client.read_flow_runs(
            deployment_filter=deployment_filter,
            flow_run_filter=FlowRunFilter(
                state={"type": {"any_": ["RUNNING"]}}
            ),
            limit=1
        )
        
        return flow_runs

# Usage example:
import asyncio
from prefect.settings import PREFECT_UI_URL

async def main():
    flow_runs = await get_running_flow_runs("your-deployment-name")
    if flow_runs:
        run = flow_runs[0]  # Get the single result
        print(f"Flow Run ID: {run.id}")
        print(f"State: {run.state.type}")
        print(f"Start Time: {run.start_time}")
        
        # Construct the UI URL for the flow run
        ui_url = f"{PREFECT_UI_URL.value()}/flow-runs/flow-run/{run.id}"
        print(f"UI URL: {ui_url}")
    else:
        print("No running flow runs found for this deployment")

# Run the async function
asyncio.run(main())
And here's the synchronous version:
Copy code
python
from prefect.client import get_client
from prefect.client.schemas.filters import FlowRunFilter, DeploymentFilter, DeploymentFilterName
from prefect.settings import PREFECT_UI_URL

def get_running_flow_runs(deployment_name: str):
    with get_client() as client:
        deployment_filter = DeploymentFilter(
            name=DeploymentFilterName(any_=[deployment_name])
        )
        
        flow_runs = client.read_flow_runs(
            deployment_filter=deployment_filter,
            flow_run_filter=FlowRunFilter(
                state={"type": {"any_": ["RUNNING"]}}
            ),
            limit=1
        )
        
        if flow_runs:
            run = flow_runs[0]
            ui_url = f"{PREFECT_UI_URL.value()}/flow-runs/flow-run/{run.id}"
            return run, ui_url
        return None, None

# Usage
run, ui_url = get_running_flow_runs("your-deployment-name")
if run:
    print(f"Flow Run ID: {run.id}")
    print(f"State: {run.state.type}")
    print(f"Start Time: {run.start_time}")
    print(f"UI URL: {ui_url}")
else:
    print("No running flow runs found for this deployment")
The key additions are: 1. We import
PREFECT_UI_URL
from
prefect.settings
2. We construct the UI URL using the format
{PREFECT_UI_URL}/flow-runs/flow-run/{flow_run_id}
This will give you a direct link to view the flow run in the Prefect UI. The URL will work whether you're using Prefect Cloud or a self-hosted server, as it uses your configured UI URL setting. Note that
PREFECT_UI_URL.value()
will return the appropriate UI URL based on your Prefect configuration. If you're running a local Prefect server, this will typically be
<http://localhost:4200>
, and if you're using Prefect Cloud, it will be
<https://app.prefect.cloud>
.