<@ULVA73B9P> give me the api endpoint to get a tas...
# ask-marvin
y
@Marvin give me the api endpoint to get a task's status
m
thought for 60.9 seconds
In Prefect 3.x, use this endpoint to get a task run’s current status (state):
Copy code
GET {PREFECT_API_URL}/task_runs/{task_run_id}/state
Examples: - Local server:
<http://127.0.0.1:4200/api/task_runs/{task_run_id}/state>
- Prefect Cloud:
<https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/task_runs/{task_run_id}/state>
cURL examples:
Copy code
# Local
curl <http://127.0.0.1:4200/api/task_runs/<TASK_RUN_ID>/state>

# Cloud
curl -H "Authorization: Bearer <PREFECT_API_KEY>" \
  <https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>/task_runs/<TASK_RUN_ID>/state>
The response includes the state object; the
type
field is the status (e.g., COMPLETED, RUNNING, FAILED). Docs: - Read Task Run State - Read Task Run (if you need metadata) Need help finding the
task_run_id
(e.g., by name or flow run)? I can show the filter query too.
y
@Marvin get all, i'm building an exporter
m
thought for 149.5 seconds
Got it — for exporting “all task run statuses,” here are the key Prefect 3.x REST endpoints you’ll want. List/paginate all task runs - Endpoint: - Local:
POST <http://127.0.0.1:4200/api/task_runs/paginate>
- Cloud:
POST <https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/task_runs/paginate>
- Docs: Paginate Task Runs - Notes: - Response shape:
TaskRunPaginationResponse
with
results
,
count
,
limit
,
pages
,
page
. -
results
entries are
TaskRunResponse
. Depending on server version,
state
may be included as the latest state; if not, fetch it via the state endpoint below. - Example request:
Copy code
# Local server
curl -X POST <http://127.0.0.1:4200/api/task_runs/paginate> \
  -H "Content-Type: application/json" \
  -d '{
    "page": 1,
    "limit": 200,
    "sort": "-created",         # optional; example of descending sort
    "flows": {},                # optional filter groups
    "flow_runs": {},
    "task_runs": {},
    "deployments": {}
  }'
Copy code
# Prefect Cloud
curl -X POST <https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>/task_runs/paginate> \
  -H "Authorization: Bearer <PREFECT_API_KEY>" \
  -H "Content-Type: application/json" \
  -d '{
    "page": 1,
    "limit": 200,
    "task_runs": {
      "start_time": { "since": "2025-01-01T00:00:00Z" }   # example incremental filter
    }
  }'
Get the latest status (state) for a task run - Endpoint: - Local:
GET <http://127.0.0.1:4200/api/task_runs/{task_run_id}/state>
- Cloud:
GET <https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/task_runs/{task_run_id}/state>
- Docs: Read Task Run State - Example:
Copy code
curl <http://127.0.0.1:4200/api/task_runs/<TASK_RUN_ID>/state>
Copy code
curl -H "Authorization: Bearer <PREFECT_API_KEY>" \
  <https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>/task_runs/<TASK_RUN_ID>/state>
Get the full state history for a task run - Endpoint: - Local:
GET <http://127.0.0.1:4200/api/task_runs/{task_run_id}/states>
- Cloud:
GET <https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/task_runs/{task_run_id}/states>
- Docs: Read Task Run States (history) Exporter pattern - Page through
/task_runs/paginate
until
page > pages
. - For each task run: - If
results[i].state
is present, that’s your latest status (e.g.,
type: "COMPLETED" | "FAILED" | "RUNNING"
, etc.). - If not present, call
GET /task_runs/{id}/state
. - For incremental exports, filter by a timestamp field (e.g.,
task_runs.start_time.since
or
task_runs.updated.since
) in the paginate body. If you tell me Cloud vs local and whether you want only the latest state or full histories, I’ll give you a drop-in cURL or Python snippet that paginates correctly. I know, thrilling.
y
@Marvin sorry i meant flow runs, i would like to get all flow runs with a state of pending.
m
thought for 1378.1 seconds
thought for 109.2 seconds
If you want all flow runs currently in a PENDING state: List/paginate flow runs filtered by state=PENDING - Endpoint: - Local:
POST <http://127.0.0.1:4200/api/flow_runs/paginate>
- Cloud:
POST <https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/flow_runs/paginate>
- Docs: Paginate Flow Runs - Example (local):
Copy code
curl -X POST <http://127.0.0.1:4200/api/flow_runs/paginate> \
  -H "Content-Type: application/json" \
  -d '{
    "page": 1,
    "limit": 200,
    "flow_runs": {
      "state": {
        "type": { "any_": ["PENDING"] }
      }
    }
  }'
- Example (Cloud):
Copy code
curl -X POST <https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>/flow_runs/paginate> \
  -H "Authorization: Bearer <PREFECT_API_KEY>" \
  -H "Content-Type: application/json" \
  -d '{
    "page": 1,
    "limit": 200,
    "flow_runs": {
      "state": {
        "type": { "any_": ["PENDING"] }
      }
    }
  }'
Notes - The response includes
results
(array of flow runs) plus pagination metadata (
count
,
pages
,
page
,
limit
). Each result typically includes the latest
state
. If your server doesn’t return
state
, fetch it per run via: - Local:
GET <http://127.0.0.1:4200/api/flow_runs/{flow_run_id}/state>
- Cloud:
GET <https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/flow_runs/{flow_run_id}/state>
- Docs: Read Flow Run State Quick sanity check - If by “pending” you mean “not started yet,” you may also want SCHEDULED (and possibly AWAITINGRETRY/PAUSED depending on your definition). You can include multiple types:
Copy code
"flow_runs": {
  "state": {
    "type": { "any_": ["PENDING", "SCHEDULED"] }
  }
}
Want a ready-to-run exporter loop (Python or cURL) that pages until complete? Say the word. I’ll try to contain my excitement.
y
@Marvin ok how about a quick script that paginates through all flow runs in a state of PENDING
m
thought for 38.8 seconds
Here’s a tiny Python script that paginates all flow runs with state=PENDING and prints them as NDJSON. Works for local and Cloud.
Copy code
#!/usr/bin/env python3
import os
import sys
import json
import requests

# Config:
# - Local default: <http://127.0.0.1:4200/api>
# - Cloud: export PREFECT_API_URL="<https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>>"
BASE_URL = os.getenv("PREFECT_API_URL", "<http://127.0.0.1:4200/api>")
API_KEY = os.getenv("PREFECT_API_KEY")  # required for Cloud
PAGE_SIZE = int(os.getenv("PAGE_SIZE", "200"))

headers = {"Content-Type": "application/json"}
if "api.prefect.cloud" in BASE_URL and API_KEY:
    headers["Authorization"] = f"Bearer {API_KEY}"

def main():
    page = 1
    total = 0

    while True:
        payload = {
            "page": page,
            "limit": PAGE_SIZE,
            "flow_runs": {
                "state": { "type": { "any_": ["PENDING"] } }
            }
        }

        resp = <http://requests.post|requests.post>(f"{BASE_URL}/flow_runs/paginate", headers=headers, json=payload, timeout=30)
        if resp.status_code != 200:
            print(f"Error {resp.status_code}: {resp.text}", file=sys.stderr)
            sys.exit(1)

        data = resp.json()
        results = data.get("results", [])
        if not results:
            break

        for fr in results:
            print(json.dumps(fr))  # NDJSON output
            total += 1

        pages = data.get("pages")
        if pages is not None and page >= pages:
            break
        page += 1

    print(f"# Exported {total} PENDING flow runs", file=sys.stderr)

if __name__ == "__main__":
    main()
Usage - Local server (no auth needed):
Copy code
python3 pending_flow_runs_export.py > flow_runs_pending.ndjson
- Prefect Cloud:
Copy code
export PREFECT_API_URL="<https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>>"
export PREFECT_API_KEY="<YOUR_API_KEY>"
python3 pending_flow_runs_export.py > flow_runs_pending.ndjson
Docs for reference: - Paginate Flow Runs If you also want SCHEDULED (often “not started yet”), swap the filter to
["PENDING", "SCHEDULED"]
. I know, living on the edge.
If you want all flow runs currently in a PENDING state, use the flow run pagination endpoint with a state filter. Endpoint - Local: POST http://127.0.0.1:4200/api/flow_runs/paginate - Cloud: POST https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/flow_runs/paginate - Docs: Paginate Flow Runs Filter for PENDING Request body (minimal):
Copy code
{
  "page": 1,
  "limit": 200,
  "flow_runs": {
    "state": {
      "type": {
        "any_": [
          "PENDING"
        ]
      }
    }
  }
}
cURL examples
Copy code
# Local
curl -X POST <http://127.0.0.1:4200/api/flow_runs/paginate> \
  -H "Content-Type: application/json" \
  -d '{
    "page": 1,
    "limit": 200,
    "flow_runs": { "state": { "type": { "any_": ["PENDING"] } } }
  }'
Copy code
# Cloud
curl -X POST <https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>/flow_runs/paginate> \
  -H "Authorization: Bearer <PREFECT_API_KEY>" \
  -H "Content-Type: application/json" \
  -d '{
    "page": 1,
    "limit": 200,
    "flow_runs": { "state": { "type": { "any_": ["PENDING"] } } }
  }'
Notes - Page through until
page > pages
in the response to get them all. - Each result typically includes the latest state in a
state
field. If you need to double-check a run’s state, you can call: - Local:
GET <http://127.0.0.1:4200/api/flow_runs/{flow_run_id}/state>
- Cloud:
GET <https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/flow_runs/{flow_run_id}/state>
- Docs: Read Flow Run State If you want an incremental exporter, add a since filter (e.g.,
flow_runs: { updated: { since: "2025-01-01T00:00:00Z" } }
) so you don’t replay history forever. Merciful, I know.
y
@Marvin i would like to get ALL flow runs in the past 7d
m
thought for 37.4 seconds
Do you mean “created in the last 7 days,” “started in the last 7 days,” or “updated in the last 7 days”? I’ll show all three—pick your poison. Endpoint - Local: POST http://127.0.0.1:4200/api/flow_runs/paginate - Cloud: POST https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/flow_runs/paginate - Docs: Paginate Flow Runs Example bodies - Created in last 7 days:
Copy code
{
  "page": 1,
  "limit": 200,
  "flow_runs": {
    "created": {
      "since": "<ISO8601_7_DAYS_AGO_UTC>"
    }
  }
}
- Started in last 7 days:
Copy code
{
  "page": 1,
  "limit": 200,
  "flow_runs": {
    "start_time": {
      "since": "<ISO8601_7_DAYS_AGO_UTC>"
    }
  }
}
- Updated in last 7 days (captures any runs “touched” in that period):
Copy code
{
  "page": 1,
  "limit": 200,
  "flow_runs": {
    "updated": {
      "since": "<ISO8601_7_DAYS_AGO_UTC>"
    }
  }
}
cURL examples - Local (created in last 7d):
Copy code
curl -X POST <http://127.0.0.1:4200/api/flow_runs/paginate> \
  -H "Content-Type: application/json" \
  -d '{
    "page": 1,
    "limit": 200,
    "flow_runs": { "created": { "since": "2025-08-11T00:00:00Z" } }
  }'
- Cloud (updated in last 7d):
Copy code
curl -X POST <https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>/flow_runs/paginate> \
  -H "Authorization: Bearer <PREFECT_API_KEY>" \
  -H "Content-Type: application/json" \
  -d '{
    "page": 1,
    "limit": 200,
    "flow_runs": { "updated": { "since": "2025-08-11T00:00:00Z" } }
  }'
Python helper (computes 7d-ago automatically, NDJSON output)
Copy code
#!/usr/bin/env python3
import os, sys, json, requests
from datetime import datetime, timedelta, timezone

BASE_URL = os.getenv("PREFECT_API_URL", "<http://127.0.0.1:4200/api>")
API_KEY = os.getenv("PREFECT_API_KEY")
PAGE_SIZE = int(os.getenv("PAGE_SIZE", "200"))
WINDOW = os.getenv("WINDOW", "created")  # created|start_time|updated

since = (datetime.now(timezone.utc) - timedelta(days=7)).replace(microsecond=0).isoformat().replace("+00:00","Z")

headers = {"Content-Type": "application/json"}
if "api.prefect.cloud" in BASE_URL and API_KEY:
    headers["Authorization"] = f"Bearer {API_KEY}"

page = 1
total = 0
while True:
    payload = {
        "page": page,
        "limit": PAGE_SIZE,
        "flow_runs": { WINDOW: { "since": since } }
    }
    r = <http://requests.post|requests.post>(f"{BASE_URL}/flow_runs/paginate", headers=headers, json=payload, timeout=30)
    r.raise_for_status()
    data = r.json()
    results = data.get("results", [])
    if not results:
        break
    for fr in results:
        print(json.dumps(fr))
        total += 1
    pages = data.get("pages")
    if pages is not None and page >= pages:
        break
    page += 1

print(f"# Exported {total} flow runs since {since} by {WINDOW}", file=sys.stderr)
Notes - Paginate until
page > pages
to get them all. - Each result typically includes the latest `state`; if not, fetch it via: - Local: GET /flow_runs/{flow_run_id}/state - Cloud: GET /api/accounts/{account_id}/workspaces/{workspace_id}/flow_runs/{flow_run_id}/state - Docs: Read Flow Run State Tell me which dimension (created/started/updated) you actually want, and I’ll lock the script to that. I’ll try to contain my enthusiasm.
y
@Marvin please also exclude flow runs that are scheduled
m
thought for 25.3 seconds
Got it — last 7 days, excluding SCHEDULED. cURL (local, using updated since 7d ago)
Copy code
SINCE=$(date -u -d '7 days ago' +%Y-%m-%dT%H:%M:%SZ)

curl -X POST <http://127.0.0.1:4200/api/flow_runs/paginate> \
  -H "Content-Type: application/json" \
  -d "{
    \"page\": 1,
    \"limit\": 200,
    \"flow_runs\": {
      \"updated\": { \"since\": \"${SINCE}\" },
      \"state\": { \"type\": { \"not_any_\": [\"SCHEDULED\"] } }
    }
  }"
cURL (Cloud)
Copy code
SINCE=$(date -u -d '7 days ago' +%Y-%m-%dT%H:%M:%SZ)

curl -X POST <https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>/flow_runs/paginate> \
  -H "Authorization: Bearer <PREFECT_API_KEY>" \
  -H "Content-Type: application/json" \
  -d "{
    \"page\": 1,
    \"limit\": 200,
    \"flow_runs\": {
      \"updated\": { \"since\": \"${SINCE}\" },
      \"state\": { \"type\": { \"not_any_\": [\"SCHEDULED\"] } }
    }
  }"
Tiny Python exporter (NDJSON), excludes SCHEDULED
Copy code
#!/usr/bin/env python3
import os, sys, json, requests
from datetime import datetime, timedelta, timezone

BASE_URL = os.getenv("PREFECT_API_URL", "<http://127.0.0.1:4200/api>")
API_KEY = os.getenv("PREFECT_API_KEY")
PAGE_SIZE = int(os.getenv("PAGE_SIZE", "200"))
WINDOW = os.getenv("WINDOW", "updated")  # created|start_time|updated

since = (datetime.now(timezone.utc) - timedelta(days=7)).replace(microsecond=0).isoformat().replace("+00:00","Z")

headers = {"Content-Type": "application/json"}
if "api.prefect.cloud" in BASE_URL and API_KEY:
    headers["Authorization"] = f"Bearer {API_KEY}"

page, total = 1, 0
while True:
    payload = {
        "page": page,
        "limit": PAGE_SIZE,
        "flow_runs": {
            WINDOW: {"since": since},
            "state": {"type": {"not_any_": ["SCHEDULED"]}}
        }
    }
    r = <http://requests.post|requests.post>(f"{BASE_URL}/flow_runs/paginate", headers=headers, json=payload, timeout=30)
    if r.status_code != 200:
        print(f"Error {r.status_code}: {r.text}", file=sys.stderr)
        sys.exit(1)
    data = r.json()
    results = data.get("results", [])
    if not results:
        break
    for fr in results:
        print(json.dumps(fr))
        total += 1
    pages = data.get("pages")
    if pages is not None and page >= pages:
        break
    page += 1

print(f"# Exported {total} flow runs since {since} by {WINDOW}, excluding SCHEDULED", file=sys.stderr)
Docs: Paginate Flow Runs If you prefer “created in last 7d” or “started in last 7d,” set WINDOW to created or start_time. I won’t be offended.