Pascal Jahan Elahi (Pawsey)
01/16/2025, 7:13 AMMarvin
01/16/2025, 7:13 AMcancel
function from the prefect.flow_run
module. Here's how you can implement a task that cancels another flow:
1. Import the necessary Prefect components.
2. Create a task that calls the cancel
function with the appropriate flow run ID.
Here's a basic example:
python
from prefect import flow, task, get_client
from prefect.client.orchestration import PrefectClient
@task
async def cancel_flow(flow_run_id: str):
async with get_client() as client:
await client.cancel_flow_run(flow_run_id)
@flow
async def control_flow():
# Replace with the actual flow run ID you wish to cancel
flow_run_id = "your-flow-run-id"
await cancel_flow(flow_run_id)
if __name__ == "__main__":
control_flow()
This example assumes you have the flow run ID of the flow you wish to cancel. The cancel_flow
task uses the Prefect client to cancel the specified flow run.
For more details, you can refer to the documentation: Prefect 3.x Cancel Flow RunPascal Jahan Elahi (Pawsey)
01/16/2025, 7:17 AMMarvin
01/16/2025, 7:17 AMpython
from prefect import task, Flow
from prefect.client import Client
@task
def cancel_another_flow(flow_run_id: str):
client = Client()
client.cancel_flow_run(flow_run_id)
with Flow("Control Flow") as flow:
# Replace with the actual flow run ID you wish to cancel
flow_run_id = "your-flow-run-id"
cancel_another_flow(flow_run_id)
if __name__ == "__main__":
flow.run()
This example uses the Client
from Prefect to cancel the specified flow run. You need to replace "your-flow-run-id"
with the actual ID of the flow run you want to cancel.
For more details on flow management, you can refer to the documentation: Prefect 2.x FlowsPascal Jahan Elahi (Pawsey)
01/16/2025, 7:19 AMMarvin
01/16/2025, 7:19 AMpython
from prefect.client import Client
def find_flows_by_run_id(flow_run_id: str):
client = Client()
# Retrieve the flow run details
flow_run = client.get_flow_run_info(flow_run_id)
# Print the flow name associated with this flow run
print(f"Flow Name: {flow_run.flow.name}")
print(f"Flow Run ID: {flow_run.id}")
# Replace with the actual flow run ID you wish to query
flow_run_id = "your-flow-run-id"
find_flows_by_run_id(flow_run_id)
This script uses the get_flow_run_info
method to retrieve details about a specific flow run, then extracts and prints the flow name associated with that run. Make sure to replace "your-flow-run-id"
with the actual flow run ID you're interested in.
For more detailed usage, you can refer to the documentation: Using the Prefect ClientPascal Jahan Elahi (Pawsey)
01/16/2025, 7:50 AMMarvin
01/16/2025, 7:50 AMpython
from prefect.client import Client
from prefect.client.schemas.filters import FlowRunFilter
from prefect.client.schemas.objects import FlowRun
from prefect.client.schemas.sorting import FlowRunSort
def get_running_flows():
client = Client()
# Define a filter for flow runs in the "Running" state
flow_run_filter = FlowRunFilter(state={"name": {"any_": ["Running"]}})
# Retrieve running flow runs
running_flow_runs = client.read_flow_runs(flow_run_filter)
# Print details of the running flows
for flow_run in running_flow_runs:
print(f"Flow Name: {flow_run.flow.name}, Flow Run ID: {flow_run.id}")
get_running_flows()
This script uses the read_flow_runs
method with a filter to get all flow runs currently in the "Running" state. It then prints the names and IDs of these flow runs.
For more detailed usage, you can refer to the documentation: Using the Prefect ClientPascal Jahan Elahi (Pawsey)
01/16/2025, 7:54 AMMarvin
01/16/2025, 7:55 AMpython
from prefect.client import Client
from prefect.client.schemas.filters import FlowRunFilter, FlowFilter
def get_running_flows_by_name(flow_name: str):
client = Client()
# Define a filter for flow runs in the "Running" state and with the specified name
flow_run_filter = FlowRunFilter(
state={"name": {"any_": ["Running"]}},
flow=FlowFilter(name={"any_": [flow_name]})
)
# Retrieve running flow runs with the specified name
running_flow_runs = client.read_flow_runs(flow_run_filter)
# Print details of the running flows with the specified name
for flow_run in running_flow_runs:
print(f"Flow Name: {flow_run.flow.name}, Flow Run ID: {flow_run.id}")
# Replace with the actual flow name you wish to query
flow_name = "your-flow-name"
get_running_flows_by_name(flow_name)
This script uses the read_flow_runs
method with filters to get all flow runs that are currently running and have a specific name. Make sure to replace "your-flow-name"
with the actual name of the flow you're interested in.
For more detailed usage, you can refer to the documentation: Using the Prefect ClientPascal Jahan Elahi (Pawsey)
01/16/2025, 7:59 AMMarvin
01/16/2025, 7:59 AMpython
from prefect.client import Client
from prefect.client.schemas.filters import FlowRunFilter, FlowFilter
def cancel_running_flows_by_name(flow_name: str):
client = Client()
# Define a filter for flow runs in the "Running" state and with the specified name
flow_run_filter = FlowRunFilter(
state={"name": {"any_": ["Running"]}},
flow=FlowFilter(name={"any_": [flow_name]})
)
# Retrieve running flow runs with the specified name
running_flow_runs = client.read_flow_runs(flow_run_filter)
# Cancel each running flow
for flow_run in running_flow_runs:
print(f"Cancelling flow run '{flow_run.name}' with ID '{flow_run.id}'")
client.set_flow_run_state(flow_run.id, state="Cancelled", force=True)
# Replace with the actual flow name you wish to cancel
flow_name = "your-flow-name"
cancel_running_flows_by_name(flow_name)
This script filters for running flow runs with a specific name and then cancels each one by setting its state to "Cancelled". Make sure to replace "your-flow-name"
with the actual name of the flows you want to cancel.
For more detailed usage, you can refer to the documentation: Using the Prefect Client