Fernando Correia
02/27/2025, 10:36 PMMarvin
02/27/2025, 10:37 PMPrefectClient
class from the prefect.client.orchestration
module. Here's a concise script that demonstrates how to do this:
python
from prefect.client.orchestration import PrefectClient
from prefect.schemas.filters import FlowRunFilter, FlowRunFilterStateName
async def list_running_flow_runs():
async with PrefectClient() as client:
# Define a filter for flow runs in the "Running" state
state_filter = FlowRunFilterStateName(any_=["Running"])
flow_run_filter = FlowRunFilter(state_name=state_filter)
# Query for flow runs that match the filter
flow_runs = await client.read_flow_runs(flow_run_filter=flow_run_filter)
# Print details of running flow runs
for flow_run in flow_runs:
print(f"Flow Run ID: {flow_run.id}, Name: {flow_run.name}")
# To execute the async function
import asyncio
asyncio.run(list_running_flow_runs())
### Key Points:
- PrefectClient: Used to interact with Prefect's orchestration layer.
- FlowRunFilter: Allows filtering of flow runs based on various criteria, including state.
- FlowRunFilterStateName: Specifically filters flow runs by their state name.
For more details, you can explore the Prefect documentation or the Prefect GitHub repository.Fernando Correia
02/27/2025, 10:38 PMNate
02/27/2025, 10:38 PMMarvin
02/27/2025, 10:38 PMpython
from prefect.client.orchestration import PrefectClient
from prefect.schemas.filters import FlowRunFilter, FlowRunFilterStateName
from prefect.states import Cancelled
async def cancel_running_flow_runs():
async with PrefectClient() as client:
# Define a filter for flow runs in the "Running" state
state_filter = FlowRunFilterStateName(any_=["Running"])
flow_run_filter = FlowRunFilter(state_name=state_filter)
# Query for flow runs that match the filter
flow_runs = await client.read_flow_runs(flow_run_filter=flow_run_filter)
# Cancel each running flow run
for flow_run in flow_runs:
await client.set_flow_run_state(flow_run.id, Cancelled())
print(f"Cancelled Flow Run ID: {flow_run.id}, Name: {flow_run.name}")
# To execute the async function
import asyncio
asyncio.run(cancel_running_flow_runs())
### Key Points:
- Cancelled State: The Cancelled
state is used to indicate that a flow run should be stopped.
- set_flow_run_state: This method is used to change the state of a flow run.
This script will find all flow runs in the "Running" state and cancel them. Be cautious when using this in a production environment, as it will terminate running processes. For more details, you can refer to the Prefect documentation.Fernando Correia
02/27/2025, 10:41 PMMarvin
02/27/2025, 10:42 PMpython
from prefect.client.orchestration import PrefectClient
from prefect.schemas.filters import FlowRunFilter, FlowRunFilterStateName
from prefect.states import Cancelled
async def disable_work_pool(pool_name: str):
async with PrefectClient() as client:
# Assuming a method to update work pool status exists
work_pools = await client.read_work_pools()
for pool in work_pools:
if pool.name == pool_name:
await client.update_work_pool(pool.id, is_paused=True)
print(f"Disabled Work Pool: {pool.name}")
async def cancel_running_flow_runs():
async with PrefectClient() as client:
# Define a filter for flow runs in the "Running" state
state_filter = FlowRunFilterStateName(any_=["Running"])
flow_run_filter = FlowRunFilter(state_name=state_filter)
# Query for flow runs that match the filter
flow_runs = await client.read_flow_runs(flow_run_filter=flow_run_filter)
# Cancel each running flow run
for flow_run in flow_runs:
await client.set_flow_run_state(flow_run.id, Cancelled())
print(f"Cancelled Flow Run ID: {flow_run.id}, Name: {flow_run.name}")
# To execute the async functions
import asyncio
asyncio.run(disable_work_pool("my-work-pool"))
asyncio.run(cancel_running_flow_runs())
### Key Points:
- disable_work_pool: This function attempts to disable a work pool by name.
- update_work_pool: Assumed method to update the state of a work pool. You might need to refer to the Prefect API documentation for the exact method to pause or disable a work pool.
This script assumes the existence of a method to update the work pool's status, which may require checking Prefect's API documentation for precise implementation details.Fernando Correia
02/27/2025, 10:42 PMMarvin
02/27/2025, 10:42 PMFlowRunFilter
and FlowRunFilterStateName
are not directly available in the Prefect 3.x API as initially assumed. Let's adjust the script to ensure compatibility with Prefect 3.x:
python
from prefect.client.orchestration import PrefectClient
from prefect.states import Cancelled
async def disable_work_pool(pool_name: str):
async with PrefectClient() as client:
work_pools = await client.read_work_pools()
for pool in work_pools:
if pool.name == pool_name:
await client.update_work_pool(pool.id, is_paused=True)
print(f"Disabled Work Pool: {pool.name}")
async def cancel_running_flow_runs():
async with PrefectClient() as client:
# Retrieve all flow runs
flow_runs = await client.read_flow_runs()
# Filter and cancel running flow runs
for flow_run in flow_runs:
if flow_run.state.name == "Running":
await client.set_flow_run_state(flow_run.id, Cancelled())
print(f"Cancelled Flow Run ID: {flow_run.id}, Name: {flow_run.name}")
# To execute the async functions
import asyncio
asyncio.run(disable_work_pool("my-work-pool"))
asyncio.run(cancel_running_flow_runs())
### Key Adjustments:
- FlowRunFilter: Since the direct filter classes were not found, the script now manually filters flow runs by checking their state name.
- Work Pool Management: Assumes the existence of a method to update the work pool's status (e.g., update_work_pool
), which might require checking Prefect's API documentation for precise implementation.
This script should be compatible with Prefect 3.x, assuming the correct methods are used for managing work pools. You might need to refer to the Prefect API documentation for the exact implementation details of work pool management.Nate
02/27/2025, 10:43 PMprefect.client.schemas.filters
It seems that theandFlowRunFilter
are not directly available in the Prefect 3.x APIFlowRunFilterStateName
Marvin
02/27/2025, 10:43 PMFlowRunFilter
and FlowRunFilterStateName
classes are indeed available in the prefect.client.schemas.filters
module in Prefect 3.x. Here's the corrected script using these filters to ensure compatibility:
python
from prefect.client.orchestration import PrefectClient
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterStateName
from prefect.states import Cancelled
async def disable_work_pool(pool_name: str):
async with PrefectClient() as client:
work_pools = await client.read_work_pools()
for pool in work_pools:
if pool.name == pool_name:
await client.update_work_pool(pool.id, is_paused=True)
print(f"Disabled Work Pool: {pool.name}")
async def cancel_running_flow_runs():
async with PrefectClient() as client:
# Define a filter for flow runs in the "Running" state
state_filter = FlowRunFilterStateName(any_=["Running"])
flow_run_filter = FlowRunFilter(state_name=state_filter)
# Query for flow runs that match the filter
flow_runs = await client.read_flow_runs(flow_run_filter=flow_run_filter)
# Cancel each running flow run
for flow_run in flow_runs:
await client.set_flow_run_state(flow_run.id, Cancelled())
print(f"Cancelled Flow Run ID: {flow_run.id}, Name: {flow_run.name}")
# To execute the async functions
import asyncio
asyncio.run(disable_work_pool("my-work-pool"))
asyncio.run(cancel_running_flow_runs())
### Key Points:
- FlowRunFilter and FlowRunFilterStateName: These classes are used to filter flow runs by their state.
- PrefectClient: Used to interact with the Prefect API for both reading flow runs and updating their states.
This script should now be fully compatible with Prefect 3.x, utilizing the correct filtering mechanisms.Nate
02/27/2025, 10:44 PMfrom prefect import get_client
and then async with get_client() as client
instead of using PrefectClient
directly but otherwise looks ballpark right to me