Marco Ruta
08/12/2024, 8:30 AMMarvin
08/12/2024, 8:30 AMpython
import asyncio
from prefect.client import get_client
async def get_flow_run_state(flow_run_id: str):
async with get_client() as client:
flow_run = await client.read_flow_run(flow_run_id)
return flow_run.state
flow_run_id = "your-flow-run-id" # Replace with your actual flow run ID
flow_run_state = asyncio.run(get_flow_run_state(flow_run_id))
print(f"Flow run state: {flow_run_state}")
For more details and example code, you can refer to the Prefect documentation here. Now, you can monitor the state of your flow runs easily.
If you need further examples, you can check this code example.Marco Ruta
08/12/2024, 8:41 AMMarvin
08/12/2024, 8:41 AMpython
import asyncio
from prefect.client import get_client
async def get_flow_run_details(flow_run_id: str):
async with get_client() as client:
flow_run = await client.read_flow_run(flow_run_id)
state = flow_run.state
details = await state.result(fetch=True) # Fetch detailed result if necessary
return {
"flow_run_id": flow_run_id,
"state": state.name, # Assuming `state.name` gives a string representation of the state
"details": details,
"start_time": flow_run.start_time,
"end_time": flow_run.end_time,
"message": state.message # Any message associated with the state
}
flow_run_id = "your-flow-run-id" # Replace with your actual flow run ID
flow_run_details = asyncio.run(get_flow_run_details(flow_run_id))
print(f"Flow Run ID: {flow_run_details['flow_run_id']}")
print(f"State: {flow_run_details['state']}")
print(f"Details: {flow_run_details['details']}")
print(f"Start Time: {flow_run_details['start_time']}")
print(f"End Time: {flow_run_details['end_time']}")
print(f"Message: {flow_run_details['message']}")
This script should provide a clearer and more complete view of the flow run state, including the state name, any detailed results, and additional metadata like start and end times.
For more details about Prefect flow states, you can refer to the documentation here.Marco Ruta
08/12/2024, 9:23 AMMarvin
08/12/2024, 9:24 AMpython
import asyncio
from prefect.client import get_client
async def get_task_runs_details(flow_run_id: str):
async with get_client() as client:
# Retrieve all task runs for the given flow run ID
task_runs = await client.read_task_runs(flow_run_id=flow_run_id)
# Format the task run details
task_details = [
{
"task_run_id": task_run.id,
"task_name": task_run.task_name,
"status": task_run.state.name, # Assuming `state.name` gives the status
"details": await task_run.state.result(fetch=True) # Fetch detailed result if necessary
}
for task_run in task_runs
]
return task_details
flow_run_id = "your-flow-run-id" # Replace with your actual flow run ID
task_runs_details = asyncio.run(get_task_runs_details(flow_run_id))
for task in task_runs_details:
print(f"Task Run ID: {task['task_run_id']}")
print(f"Task Name: {task['task_name']}")
print(f"Status: {task['status']}")
print(f"Details: {task['details']}")
print("-" * 40)
This script will print out the details of each task run, including the task run ID, task name, status, and any additional details.
For more detailed information, you can refer to the Prefect documentation on tasks.
This should give you complete visibility into the task runs within a flow run.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by