Adrien Besnard
06/10/2024, 8:28 AMMarvin
06/10/2024, 8:28 AMget_run_context
function to access the runtime context, which includes the flow run ID.
Here is a simple example:
python
from prefect import flow
from prefect.runtime import get_run_context
@flow
def my_flow():
context = get_run_context()
flow_run_id = context.flow_run.id
print(f"The current flow run ID is: {flow_run_id}")
if __name__ == "__main__":
my_flow()
This excerpt demonstrates how to retrieve the ID of the current flow run by leveraging the get_run_context
which provides runtime metadata including the flow run ID.
For further reading:
- Interactive Workflows
- Basic Flow ExampleAdrien Besnard
06/10/2024, 8:43 AMAdrien Besnard
06/10/2024, 8:49 AMMarvin
06/10/2024, 8:50 AMpython
import asyncio
from typing import Optional
from prefect import get_client
from prefect.client.schemas.filters import FlowRunFilter
from prefect.client.schemas.objects import FlowRun
from prefect.client.schemas.sorting import FlowRunSort
async def get_most_recent_flow_runs(n: int = 3, states: Optional[list[str]] = None) -> list[FlowRun]:
if not states:
states = ["COMPLETED"]
async with get_client() as client:
return await client.read_flow_runs(
flow_run_filter=FlowRunFilter(state={'type': {'any_': states}}),
sort=FlowRunSort.END_TIME_DESC,
limit=n,
)
if __name__ == "__main__":
last_3_flow_runs: list[FlowRun] = asyncio.run(get_most_recent_flow_runs())
for run in last_3_flow_runs:
print(f"Flow run ID: {run.id}, State: {run.state.name}, End Time: {run.end_time}")
This script uses the get_client
function to connect to the Prefect API, filters for states of interest, and sorts the results by the end time in descending order to get the most recent flow runs.
For further information, you can check out the official Prefect documentation:
- Using the ClientAdrien Besnard
06/10/2024, 8:56 AMMarvin
06/10/2024, 8:56 AMFlowRunFilter
to accept multiple states in the state
filter configuration. Below is an example where we filter for both "COMPLETED"
and "FAILED"
flow run states:
python
import asyncio
from typing import Optional
from prefect import get_client
from prefect.client.schemas.filters import FlowRunFilter
from prefect.client.schemas.objects import FlowRun
from prefect.client.schemas.sorting import FlowRunSort
async def get_flow_runs_with_states(states: list[str], n: int = 3) -> list[FlowRun]:
async with get_client() as client:
return await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
state={'type': {'any_': states}}
),
sort=FlowRunSort.END_TIME_DESC,
limit=n,
)
if __name__ == "__main__":
states_to_filter = ["COMPLETED", "FAILED"]
flow_runs: list[FlowRun] = asyncio.run(get_flow_runs_with_states(states_to_filter))
for run in flow_runs:
print(f"Flow run ID: {run.id}, State: {run.state.name}, End Time: {run.end_time}")
In this script:
- The FlowRunFilter
is configured with the state
filter set to include any of the provided states.
- The states
list contains "COMPLETED"
and "FAILED"
to filter flow runs that are either in the completed or failed state.
You can modify the states_to_filter
list to include any other states you wish to filter by.