Gabrielle Bennett
05/23/2024, 3:23 PMMarvin
05/23/2024, 3:23 PMprefect.client
to query the state details of a flow run, including its timestamps. Here's a concise example using the Prefect Orion API client:
python
from prefect import Client
from prefect.orion.schemas.core import FlowRun
from datetime import datetime
async def get_flow_run_state_duration(flow_run_id: str, state_name: str):
async with Client() as client:
flow_run = await client.read_flow_run(flow_run_id)
state_transitions = flow_run.state.history
state_durations = {}
for i in range(1, len(state_transitions)):
prev_state = state_transitions[i - 1]
current_state = state_transitions[i]
if prev_state.state_type == state_name:
duration = current_state.timestamp - prev_state.timestamp
state_durations[prev_state.state_id] = duration
return state_durations
# Example usage
# asyncio.run(get_flow_run_state_duration('your-flow-run-id', 'Running'))
This asynchronous function fetches the flow run details and extracts the history of state transitions. It then calculates the duration for each instance the flow run was in the specified state. Remember to replace 'your-flow-run-id'
and 'Running'
with appropriate values relevant to your use case.
For further reading and API details, you might explore the Prefect Orion Client documentation.
Sources:
- Prefect Flow Run Retention Policy
- Using the Prefect Client