Georg Raiser
08/18/2025, 2:11 PMMarvin
08/18/2025, 2:11 PMMarvin
08/18/2025, 2:14 PMresolve_futures_to_states
to wait and get final states for one or many submitted tasks.
- Each future also exposes its task_run_id
if you need to query history later.
from prefect import flow, task
from prefect.futures import resolve_futures_to_states
@task
def add(x, y):
return x + y
@flow
def main():
f1 = add.submit(1, 2)
f2 = add.submit(3, 4)
# Get final states for the submitted tasks
states = resolve_futures_to_states([f1, f2])
for s in states:
print("type:", s.type, "name:", s.name)
# Access a specific task run id if you want to query later
print("task_run_id:", f1.task_run_id)
main()
Query historical states (Python client)
- Given a task_run_id
, fetch its full state history.
import asyncio
from prefect.client.orchestration import get_client
async def show_task_state_history(task_run_id: str):
async with get_client() as client:
states = await client.read_task_run_states(task_run_id=task_run_id)
for st in states:
print(st.type, st.name)
# Example: plug in the task_run_id you captured from a PrefectFuture
# asyncio.run(show_task_state_history("<your-task-run-id>"))
Query via REST API
- If you prefer HTTP (or you’re outside Python), you can hit the REST endpoint:
# PREFECT_API_URL should point at your server/cloud API, e.g. <https://api.prefect.cloud/api/accounts/.../workspaces/>...
curl -H "Authorization: Bearer <token>" \
"$PREFECT_API_URL/task_runs/<task_run_id>/states"
Docs for reference:
- Task run state history (REST): read_task_run_states
- Futures utilities: prefect.futures module
If you tell me whether you want states in-flight or post-run (and where you’re running: local server vs Cloud), I can tailor this further. I know, thrilling.Georg Raiser
08/18/2025, 2:18 PMMarvin
08/18/2025, 2:20 PMMarvin
08/18/2025, 2:21 PMimport asyncio
from datetime import datetime, timedelta, timezone
from prefect.states import StateType
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
FlowRunFilter,
FlowRunFilterName,
FlowRunFilterTags,
FlowRunFilterStartTime,
FlowRunFilterState,
StateTypeFilter,
TaskRunFilter,
TaskRunFilterName,
TaskRunFilterTags,
TaskRunFilterStartTime,
TaskRunFilterState,
TaskRunFilterFlowRunId,
)
from prefect.client.schemas.sorting import FlowRunSort, TaskRunSort
async def demo():
since = datetime.now(timezone.utc) - timedelta(days=1)
# Flow runs that match: name, tags, time window, and state types
flow_filters = FlowRunFilter(
name=FlowRunFilterName(any_=["daily_ingest"]),
tags=FlowRunFilterTags(all_=["prod"]),
start_time=FlowRunFilterStartTime(after_=since),
state=FlowRunFilterState(
type=StateTypeFilter(any_=[StateType.COMPLETED, StateType.FAILED])
),
)
async with get_client() as client:
flow_runs = await client.read_flow_runs(
filters=flow_filters,
limit=10,
sort=FlowRunSort.START_TIME_DESC,
)
for fr in flow_runs:
print(fr.id, fr.name, fr.state_type, fr.start_time)
# Task runs for a specific flow run (failed in the last day)
if flow_runs:
tr_filters = TaskRunFilter(
flow_run_id=TaskRunFilterFlowRunId(any_=[flow_runs[0].id]),
start_time=TaskRunFilterStartTime(after_=since),
state=TaskRunFilterState(type=StateTypeFilter(any_=[StateType.FAILED])),
# Optionally narrow further:
# name=TaskRunFilterName(any_=["transform"]),
# tags=TaskRunFilterTags(any_=["etl"]),
)
task_runs = await client.read_task_runs(
filters=tr_filters,
limit=50,
sort=TaskRunSort.START_TIME_DESC,
)
for tr in task_runs:
print(tr.id, tr.name, tr.state_type, tr.start_time)
asyncio.run(demo())
REST API equivalents (POST filter endpoints)
- Flow runs
curl -X POST "$PREFECT_API_URL/flow_runs/filter" \
-H "Authorization: Bearer $PREFECT_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"limit": 10,
"sort": "START_TIME_DESC",
"flow_runs": {
"name": { "any_": ["daily_ingest"] },
"tags": { "all_": ["prod"] },
"start_time": { "after_": "2025-08-17T00:00:00Z" },
"state": { "type": { "any_": ["COMPLETED", "FAILED"] } }
}
}'
- Task runs
curl -X POST "$PREFECT_API_URL/task_runs/filter" \
-H "Authorization: Bearer $PREFECT_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"limit": 50,
"sort": "START_TIME_DESC",
"task_runs": {
"flow_run_id": { "any_": ["<flow_run_id>"] },
"start_time": { "after_": "2025-08-17T00:00:00Z" },
"state": { "type": { "any_": ["FAILED"] } }
}
}'
Docs, in case you enjoy reading schemas:
- Filter classes: prefect.client.schemas.filters
- Orchestration client: prefect.client.orchestration
- REST: Flow runs API, Task runs API
If you share exactly what you want to filter on (names, tags, states, time range), I’ll tighten the snippet accordingly. With gusto, even.Georg Raiser
08/18/2025, 3:10 PMMarvin
08/18/2025, 3:10 PMMarvin
08/18/2025, 3:11 PMimport os
import requests
PREFECT_API_URL = os.getenv("PREFECT_API_URL", "<http://127.0.0.1:4200/api>")
PREFECT_API_KEY = os.getenv("PREFECT_API_KEY") # required for Prefect Cloud
def get_task_runs_by_tags(tags_any=None, tags_all=None, page_size=200, max_pages=None):
"""
Query task runs filtered by tags using Prefect's REST API with pagination.
- tags_any: returns runs that have ANY of these tags
- tags_all: returns runs that have ALL of these tags
"""
headers = {
"Content-Type": "application/json",
}
if PREFECT_API_KEY:
headers["Authorization"] = f"Bearer {PREFECT_API_KEY}"
url = f"{PREFECT_API_URL}/task_runs/filter"
# Build the tag filter
tag_filter = {}
if tags_any:
tag_filter["any_"] = tags_any
if tags_all:
tag_filter["all_"] = tags_all
results = []
offset = 0
pages = 0
while True:
payload = {
"task_runs": {
"tags": tag_filter
},
# Adjust as needed:
"limit": page_size,
"offset": offset,
"sort": "START_TIME_DESC"
}
resp = <http://requests.post|requests.post>(url, headers=headers, json=payload, timeout=30)
if resp.status_code != 200:
raise RuntimeError(f"Prefect API error {resp.status_code}: {resp.text}")
batch = resp.json() or []
results.extend(batch)
if len(batch) < page_size:
break # no more pages
offset += page_size
pages += 1
if max_pages is not None and pages >= max_pages:
break
return results
if __name__ == "__main__":
# Example: any of these tags
runs = get_task_runs_by_tags(tags_any=["prod", "billing"], page_size=100)
print(f"Found {len(runs)} task runs")
for tr in runs[:10]:
print(tr["id"], tr.get("name"), tr.get("state_type"), tr.get("start_time"))
Notes
- For Prefect Cloud, PREFECT_API_URL should include your account/workspace path, e.g.:
https://api.prefect.cloud/api/accounts/<account_id>/workspaces/<workspace_id>
- Tag filter operators:
- any_: match if a run has any of the listed tags
- all_: match only if it has all of them
- not_any_: exclude runs with any of the listed tags (if you need it)
REST docs if you’d like to stare into the abyss:
- Task runs API: Task runs REST endpoints
- Filter schema reference: prefect.client.schemas.filters
If you also need to filter by time window or state while you’re at it, say so and I’ll add it. I live to serve. Begrudgingly.