<@ULVA73B9P> how do I retrieve the states of my ta...
# marvin-ai
g
@Marvin how do I retrieve the states of my tasks
m
thought for 190.5 seconds
Are you trying to get them while the flow is running, or after the fact? Here are both. Assuming Prefect 3.x. During a flow (from PrefectFutures) - Use
resolve_futures_to_states
to wait and get final states for one or many submitted tasks. - Each future also exposes its
task_run_id
if you need to query history later.
Copy code
from prefect import flow, task
from prefect.futures import resolve_futures_to_states

@task
def add(x, y):
    return x + y

@flow
def main():
    f1 = add.submit(1, 2)
    f2 = add.submit(3, 4)

    # Get final states for the submitted tasks
    states = resolve_futures_to_states([f1, f2])
    for s in states:
        print("type:", s.type, "name:", s.name)

    # Access a specific task run id if you want to query later
    print("task_run_id:", f1.task_run_id)

main()
Query historical states (Python client) - Given a
task_run_id
, fetch its full state history.
Copy code
import asyncio
from prefect.client.orchestration import get_client

async def show_task_state_history(task_run_id: str):
    async with get_client() as client:
        states = await client.read_task_run_states(task_run_id=task_run_id)
        for st in states:
            print(st.type, st.name)

# Example: plug in the task_run_id you captured from a PrefectFuture
# asyncio.run(show_task_state_history("<your-task-run-id>"))
Query via REST API - If you prefer HTTP (or you’re outside Python), you can hit the REST endpoint:
Copy code
# PREFECT_API_URL should point at your server/cloud API, e.g. <https://api.prefect.cloud/api/accounts/.../workspaces/>...
curl -H "Authorization: Bearer <token>" \
  "$PREFECT_API_URL/task_runs/<task_run_id>/states"
Docs for reference: - Task run state history (REST): read_task_run_states - Futures utilities: prefect.futures module If you tell me whether you want states in-flight or post-run (and where you’re running: local server vs Cloud), I can tailor this further. I know, thrilling.
g
@Marvin how to get previous tasks and flows? Can you give me an example snippet for the use of the filters?
m
thought for 94.3 seconds
Assuming Prefect 3.x: you’ll use the client’s filter classes to query historical flow runs and task runs. Here are concise examples you can copy/paste. Python client examples (name, state, time window, tags, sort, limit)
Copy code
import asyncio
from datetime import datetime, timedelta, timezone

from prefect.states import StateType
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import (
    FlowRunFilter,
    FlowRunFilterName,
    FlowRunFilterTags,
    FlowRunFilterStartTime,
    FlowRunFilterState,
    StateTypeFilter,
    TaskRunFilter,
    TaskRunFilterName,
    TaskRunFilterTags,
    TaskRunFilterStartTime,
    TaskRunFilterState,
    TaskRunFilterFlowRunId,
)
from prefect.client.schemas.sorting import FlowRunSort, TaskRunSort

async def demo():
    since = datetime.now(timezone.utc) - timedelta(days=1)

    # Flow runs that match: name, tags, time window, and state types
    flow_filters = FlowRunFilter(
        name=FlowRunFilterName(any_=["daily_ingest"]),
        tags=FlowRunFilterTags(all_=["prod"]),
        start_time=FlowRunFilterStartTime(after_=since),
        state=FlowRunFilterState(
            type=StateTypeFilter(any_=[StateType.COMPLETED, StateType.FAILED])
        ),
    )

    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            filters=flow_filters,
            limit=10,
            sort=FlowRunSort.START_TIME_DESC,
        )
        for fr in flow_runs:
            print(fr.id, fr.name, fr.state_type, fr.start_time)

        # Task runs for a specific flow run (failed in the last day)
        if flow_runs:
            tr_filters = TaskRunFilter(
                flow_run_id=TaskRunFilterFlowRunId(any_=[flow_runs[0].id]),
                start_time=TaskRunFilterStartTime(after_=since),
                state=TaskRunFilterState(type=StateTypeFilter(any_=[StateType.FAILED])),
                # Optionally narrow further:
                # name=TaskRunFilterName(any_=["transform"]),
                # tags=TaskRunFilterTags(any_=["etl"]),
            )
            task_runs = await client.read_task_runs(
                filters=tr_filters,
                limit=50,
                sort=TaskRunSort.START_TIME_DESC,
            )
            for tr in task_runs:
                print(tr.id, tr.name, tr.state_type, tr.start_time)

asyncio.run(demo())
REST API equivalents (POST filter endpoints) - Flow runs
Copy code
curl -X POST "$PREFECT_API_URL/flow_runs/filter" \
  -H "Authorization: Bearer $PREFECT_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "limit": 10,
    "sort": "START_TIME_DESC",
    "flow_runs": {
      "name": { "any_": ["daily_ingest"] },
      "tags": { "all_": ["prod"] },
      "start_time": { "after_": "2025-08-17T00:00:00Z" },
      "state": { "type": { "any_": ["COMPLETED", "FAILED"] } }
    }
  }'
- Task runs
Copy code
curl -X POST "$PREFECT_API_URL/task_runs/filter" \
  -H "Authorization: Bearer $PREFECT_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "limit": 50,
    "sort": "START_TIME_DESC",
    "task_runs": {
      "flow_run_id": { "any_": ["<flow_run_id>"] },
      "start_time": { "after_": "2025-08-17T00:00:00Z" },
      "state": { "type": { "any_": ["FAILED"] } }
    }
  }'
Docs, in case you enjoy reading schemas: - Filter classes: prefect.client.schemas.filters - Orchestration client: prefect.client.orchestration - REST: Flow runs API, Task runs API If you share exactly what you want to filter on (names, tags, states, time range), I’ll tighten the snippet accordingly. With gusto, even.
g
@Marvin can you give an example using requests from python, implementing querying all tasks and filtering by tags
m
thought for 31.0 seconds
Here you go — pure requests, querying task runs (aka “previous tasks”) and filtering by tags. Assumes Prefect 3.x and that you’ve set PREFECT_API_URL and PREFECT_API_KEY.
Copy code
import os
import requests

PREFECT_API_URL = os.getenv("PREFECT_API_URL", "<http://127.0.0.1:4200/api>")
PREFECT_API_KEY = os.getenv("PREFECT_API_KEY")  # required for Prefect Cloud

def get_task_runs_by_tags(tags_any=None, tags_all=None, page_size=200, max_pages=None):
    """
    Query task runs filtered by tags using Prefect's REST API with pagination.
    - tags_any: returns runs that have ANY of these tags
    - tags_all: returns runs that have ALL of these tags
    """
    headers = {
        "Content-Type": "application/json",
    }
    if PREFECT_API_KEY:
        headers["Authorization"] = f"Bearer {PREFECT_API_KEY}"

    url = f"{PREFECT_API_URL}/task_runs/filter"

    # Build the tag filter
    tag_filter = {}
    if tags_any:
        tag_filter["any_"] = tags_any
    if tags_all:
        tag_filter["all_"] = tags_all

    results = []
    offset = 0
    pages = 0

    while True:
        payload = {
            "task_runs": {
                "tags": tag_filter
            },
            # Adjust as needed:
            "limit": page_size,
            "offset": offset,
            "sort": "START_TIME_DESC"
        }

        resp = <http://requests.post|requests.post>(url, headers=headers, json=payload, timeout=30)
        if resp.status_code != 200:
            raise RuntimeError(f"Prefect API error {resp.status_code}: {resp.text}")

        batch = resp.json() or []
        results.extend(batch)

        if len(batch) < page_size:
            break  # no more pages

        offset += page_size
        pages += 1
        if max_pages is not None and pages >= max_pages:
            break

    return results

if __name__ == "__main__":
    # Example: any of these tags
    runs = get_task_runs_by_tags(tags_any=["prod", "billing"], page_size=100)
    print(f"Found {len(runs)} task runs")
    for tr in runs[:10]:
        print(tr["id"], tr.get("name"), tr.get("state_type"), tr.get("start_time"))
Notes - For Prefect Cloud, PREFECT_API_URL should include your account/workspace path, e.g.: https://api.prefect.cloud/api/accounts/&lt;account_id&gt;/workspaces/&lt;workspace_id> - Tag filter operators: - any_: match if a run has any of the listed tags - all_: match only if it has all of them - not_any_: exclude runs with any of the listed tags (if you need it) REST docs if you’d like to stare into the abyss: - Task runs API: Task runs REST endpoints - Filter schema reference: prefect.client.schemas.filters If you also need to filter by time window or state while you’re at it, say so and I’ll add it. I live to serve. Begrudgingly.