Robert Denham
07/05/2025, 7:46 AMMarvin
07/05/2025, 7:46 AMMarvin
07/05/2025, 7:47 AMpython
from prefect import flow, task
@task
def task1(x):
return x + 1
@task
def task2(x):
return x * 2
@flow
def my_flow(x):
# Tasks are called as regular functions
result1 = task1(x)
result2 = task2(result1)
return result2
If you need to keep track of tasks for monitoring or organizational purposes, you have a few options:
1. Structure your code to maintain your own registry of tasks:
python
from prefect import flow, task
flow_tasks = []
def register_task(func):
flow_tasks.append(func)
return task(func)
@register_task
def task1(x):
return x + 1
@register_task
def task2(x):
return x * 2
# Now flow_tasks contains references to your task functions
2. Use flow runs to monitor actual task execution:
python
from prefect.client import get_client
async def get_flow_run_tasks(flow_run_id):
async with get_client() as client:
task_runs = await client.read_task_runs(flow_run_id=flow_run_id)
return task_runs
Remember that Prefect's approach is intentionally different from tools like Airflow, where DAGs are explicitly defined. In Prefect, the flow structure is determined by how data flows between tasks during execution, making it more flexible and Pythonic.
For more information about flows and tasks, you can refer to:
- Task API Reference
- Flow API ReferenceRobert Denham
07/05/2025, 7:48 AMMarvin
07/05/2025, 7:48 AMMarvin
07/05/2025, 7:49 AMpython
from prefect.client import get_client
async def get_flow_tasks(flow_run_id: str):
async with get_client() as client:
# Get all task runs for the specific flow run
task_runs = await client.read_task_runs(
flow_run_id=flow_run_id
)
# Each task run contains details about the task execution
for task_run in task_runs:
print(f"Task name: {task_run.name}")
print(f"Task state: {task_run.state_name}")
print(f"Started: {task_run.start_time}")
print(f"Finished: {task_run.end_time}")
print("---")
return task_runs
# Usage example:
import asyncio
# Replace with your actual flow run ID
flow_run_id = "your-flow-run-id"
task_runs = asyncio.run(get_flow_tasks(flow_run_id))
If you want to get the flow run ID of the most recently executed flow, you can do this:
python
async def get_latest_flow_tasks(flow_name: str):
async with get_client() as client:
# Get the most recent flow run for the given flow name
flow_runs = await client.read_flow_runs(
flow_name=flow_name,
limit=1,
sort="-start_time" # Sort by start time descending
)
if not flow_runs:
print(f"No flow runs found for flow: {flow_name}")
return []
flow_run = flow_runs[0]
return await get_flow_tasks(flow_run.id)
# Usage example:
flow_name = "your-flow-name"
task_runs = asyncio.run(get_latest_flow_tasks(flow_name))
Each TaskRun object contains useful information about the task execution, including:
- `name`: The name of the task
- `state_name`: The final state of the task (e.g., "COMPLETED", "FAILED")
- `start_time`: When the task started
- `end_time`: When the task finished
- `task_key`: A unique identifier for the task
- `flow_run_id`: The ID of the parent flow run
You can access all these attributes from the returned task runs to get a complete picture of what happened during the flow execution.
For more details about working with the Prefect API client, you can refer to the API Client documentation.Robert Denham
07/05/2025, 7:50 AMMarvin
07/05/2025, 7:50 AMMarvin
07/05/2025, 7:53 AMpython
from prefect.client import get_client
from prefect.client.schemas.filters import LogFilter
async def get_task_logs(task_run_id: str):
async with get_client() as client:
# Create a log filter for the specific task run
log_filter = LogFilter(task_run_id={"any_": [task_run_id]})
# Retrieve the logs
logs = await client.read_logs(
log_filter=log_filter,
sort="timestamp" # Sort logs by timestamp
)
return logs
# Usage example:
import asyncio
async def print_task_logs(task_run_id: str):
logs = await get_task_logs(task_run_id)
for log in logs:
print(f"[{log.timestamp}] {log.level}: {log.message}")
# Run it
task_run_id = "your-task-run-id"
asyncio.run(print_task_logs(task_run_id))
Each log entry contains several useful fields:
- `timestamp`: When the log was created
- `level`: Log level (INFO, ERROR, etc.)
- `message`: The actual log message
- `flow_run_id`: ID of the parent flow run
- `task_run_id`: ID of the task run
- `name`: Logger name
You can also add additional filters to narrow down the logs:
python
from datetime import datetime, timedelta
async def get_filtered_task_logs(
task_run_id: str,
min_level: str = "INFO",
start_time: datetime = None
):
async with get_client() as client:
log_filter = LogFilter(
task_run_id={"any_": [task_run_id]},
level={"ge_": min_level}, # Get logs at or above this level
timestamp={"gt_": start_time} if start_time else None
)
logs = await client.read_logs(
log_filter=log_filter,
sort="timestamp"
)
return logs
# Example usage with filters
async def print_recent_error_logs(task_run_id: str):
# Get ERROR logs from the last hour
start_time = datetime.utcnow() - timedelta(hours=1)
logs = await get_filtered_task_logs(
task_run_id=task_run_id,
min_level="ERROR",
start_time=start_time
)
for log in logs:
print(f"[{log.timestamp}] {log.level}: {log.message}")
The logs are returned in chronological order, giving you a complete picture of what happened during the task execution. This can be particularly useful for debugging or auditing purposes.
For more information about working with logs, you can refer to the Prefect Client API documentation.Robert Denham
07/05/2025, 7:59 AMMarvin
07/05/2025, 7:59 AMMarvin
07/05/2025, 7:59 AMpython
from prefect.client import get_client
from prefect.client.schemas.filters import FlowFilter, FlowFilterId
async def get_flow_tasks(flow_id: str):
async with get_client() as client:
# Create a flow filter with the flow ID
flow_filter = FlowFilter(
id=FlowFilterId(any_=[flow_id])
)
# Get all task runs for the specific flow
task_runs = await client.read_task_runs(
flow_filter=flow_filter,
sort="start_time" # Optional: sort by start time
)
return task_runs
# Usage example:
import asyncio
async def print_flow_tasks(flow_id: str):
task_runs = await get_flow_tasks(flow_id)
for task_run in task_runs:
print(f"Task: {task_run.name}")
print(f"State: {task_run.state_name}")
print(f"Started: {task_run.start_time}")
print(f"Finished: {task_run.end_time}")
print("---")
# Run it
flow_id = "your-flow-id"
asyncio.run(print_flow_tasks(flow_id))
You can also add additional filters to narrow down the results. For example, to get only completed tasks from a specific flow:
python
from prefect.client.schemas.filters import (
FlowFilter,
FlowFilterId,
TaskRunFilter,
TaskRunFilterStateName
)
async def get_completed_flow_tasks(flow_id: str):
async with get_client() as client:
flow_filter = FlowFilter(
id=FlowFilterId(any_=[flow_id])
)
task_filter = TaskRunFilter(
state_name=TaskRunFilterStateName(any_=["COMPLETED"])
)
task_runs = await client.read_task_runs(
flow_filter=flow_filter,
task_run_filter=task_filter,
sort="start_time"
)
return task_runs
Each TaskRun object in the returned list contains detailed information about the task execution, including:
- `name`: The name of the task
- `state_name`: Current state of the task
- `start_time`: When the task started
- `end_time`: When the task finished
- `task_key`: The unique identifier for the task
- `state_type`: The type of state (e.g., COMPLETED, FAILED, etc.)
This approach will give you all task runs associated with the specified flow ID, which you can then process or analyze as needed.