Lester Cayabyab
07/22/2025, 8:49 PMMarvin
07/22/2025, 8:49 PMMarvin
07/22/2025, 8:51 PMpython
from prefect.client import get_client
from prefect.client.schemas.filters import FlowRunFilter, DeploymentFilter, DeploymentFilterName
async def get_running_flow_runs(deployment_name: str):
# Create an async client
async with get_client() as client:
# Create a deployment filter for the specific deployment name
deployment_filter = DeploymentFilter(
name=DeploymentFilterName(any_=[deployment_name])
)
# Query flow runs with the deployment filter
flow_runs = await client.read_flow_runs(
deployment_filter=deployment_filter,
# Only get runs that are in a running state
flow_run_filter=FlowRunFilter(
state={"type": {"any_": ["RUNNING"]}}
)
)
return flow_runs
# Usage example:
import asyncio
async def main():
flow_runs = await get_running_flow_runs("your-deployment-name")
for run in flow_runs:
print(f"Flow Run ID: {run.id}")
print(f"State: {run.state.type}")
print(f"Start Time: {run.start_time}")
print("---")
# Run the async function
asyncio.run(main())
This code will:
1. Create a client connection
2. Filter flow runs by the specified deployment name
3. Further filter to only show running flow runs
4. Return the matching flow runs
A few important notes:
- This is an async implementation, which is recommended for Prefect 3.x
- The code uses the get_client()
function which will automatically handle the connection to your Prefect server
- The DeploymentFilter
and FlowRunFilter
allow you to specify exactly what you're looking for
- You can modify the state filter to look for other states if needed (e.g., "PENDING", "COMPLETED", "FAILED")
If you need a synchronous version instead, you can use the synchronous client:
python
from prefect.client import get_client
def get_running_flow_runs(deployment_name: str):
with get_client() as client:
deployment_filter = DeploymentFilter(
name=DeploymentFilterName(any_=[deployment_name])
)
flow_runs = client.read_flow_runs(
deployment_filter=deployment_filter,
flow_run_filter=FlowRunFilter(
state={"type": {"any_": ["RUNNING"]}}
)
)
return flow_runs
Both versions will achieve the same result, but the async version is generally recommended for better performance, especially when making multiple API calls.Lester Cayabyab
07/22/2025, 9:10 PMMarvin
07/22/2025, 9:10 PMMarvin
07/22/2025, 9:10 PMpython
from prefect.client import get_client
from prefect.client.schemas.filters import FlowRunFilter, DeploymentFilter, DeploymentFilterName
async def get_running_flow_runs(deployment_name: str):
# Create an async client
async with get_client() as client:
# Create a deployment filter for the specific deployment name
deployment_filter = DeploymentFilter(
name=DeploymentFilterName(any_=[deployment_name])
)
# Query flow runs with the deployment filter and limit=1
flow_runs = await client.read_flow_runs(
deployment_filter=deployment_filter,
flow_run_filter=FlowRunFilter(
state={"type": {"any_": ["RUNNING"]}}
),
limit=1 # This will return only one result
)
return flow_runs
# Usage example:
import asyncio
async def main():
flow_runs = await get_running_flow_runs("your-deployment-name")
if flow_runs:
run = flow_runs[0] # Get the single result
print(f"Flow Run ID: {run.id}")
print(f"State: {run.state.type}")
print(f"Start Time: {run.start_time}")
else:
print("No running flow runs found for this deployment")
# Run the async function
asyncio.run(main())
And here's the synchronous version if you prefer:
python
from prefect.client import get_client
from prefect.client.schemas.filters import FlowRunFilter, DeploymentFilter, DeploymentFilterName
def get_running_flow_runs(deployment_name: str):
with get_client() as client:
deployment_filter = DeploymentFilter(
name=DeploymentFilterName(any_=[deployment_name])
)
flow_runs = client.read_flow_runs(
deployment_filter=deployment_filter,
flow_run_filter=FlowRunFilter(
state={"type": {"any_": ["RUNNING"]}}
),
limit=1 # This will return only one result
)
return flow_runs
The key change is the addition of limit=1
in the read_flow_runs
call. This ensures you get only the most recent matching flow run instead of all of them. Much more efficient when you only need the latest information, rather like knowing where your towel is - you really only need to know about one towel at a time.Lester Cayabyab
07/22/2025, 9:12 PMMarvin
07/22/2025, 9:12 PMMarvin
07/22/2025, 9:14 PMpython
from prefect.client import get_client
from prefect.client.schemas.filters import FlowRunFilter, DeploymentFilter, DeploymentFilterName
async def get_running_flow_runs(deployment_name: str):
# Create an async client
async with get_client() as client:
# Create a deployment filter for the specific deployment name
deployment_filter = DeploymentFilter(
name=DeploymentFilterName(any_=[deployment_name])
)
# Query flow runs with the deployment filter and limit=1
flow_runs = await client.read_flow_runs(
deployment_filter=deployment_filter,
flow_run_filter=FlowRunFilter(
state={"type": {"any_": ["RUNNING"]}}
),
limit=1
)
return flow_runs
# Usage example:
import asyncio
from prefect.settings import PREFECT_UI_URL
async def main():
flow_runs = await get_running_flow_runs("your-deployment-name")
if flow_runs:
run = flow_runs[0] # Get the single result
print(f"Flow Run ID: {run.id}")
print(f"State: {run.state.type}")
print(f"Start Time: {run.start_time}")
# Construct the UI URL for the flow run
ui_url = f"{PREFECT_UI_URL.value()}/flow-runs/flow-run/{run.id}"
print(f"UI URL: {ui_url}")
else:
print("No running flow runs found for this deployment")
# Run the async function
asyncio.run(main())
And here's the synchronous version:
python
from prefect.client import get_client
from prefect.client.schemas.filters import FlowRunFilter, DeploymentFilter, DeploymentFilterName
from prefect.settings import PREFECT_UI_URL
def get_running_flow_runs(deployment_name: str):
with get_client() as client:
deployment_filter = DeploymentFilter(
name=DeploymentFilterName(any_=[deployment_name])
)
flow_runs = client.read_flow_runs(
deployment_filter=deployment_filter,
flow_run_filter=FlowRunFilter(
state={"type": {"any_": ["RUNNING"]}}
),
limit=1
)
if flow_runs:
run = flow_runs[0]
ui_url = f"{PREFECT_UI_URL.value()}/flow-runs/flow-run/{run.id}"
return run, ui_url
return None, None
# Usage
run, ui_url = get_running_flow_runs("your-deployment-name")
if run:
print(f"Flow Run ID: {run.id}")
print(f"State: {run.state.type}")
print(f"Start Time: {run.start_time}")
print(f"UI URL: {ui_url}")
else:
print("No running flow runs found for this deployment")
The key additions are:
1. We import PREFECT_UI_URL
from prefect.settings
2. We construct the UI URL using the format {PREFECT_UI_URL}/flow-runs/flow-run/{flow_run_id}
This will give you a direct link to view the flow run in the Prefect UI. The URL will work whether you're using Prefect Cloud or a self-hosted server, as it uses your configured UI URL setting.
Note that PREFECT_UI_URL.value()
will return the appropriate UI URL based on your Prefect configuration. If you're running a local Prefect server, this will typically be <http://localhost:4200>
, and if you're using Prefect Cloud, it will be <https://app.prefect.cloud>
.