Yu Shao Pang
08/18/2025, 12:55 PMMarvin
08/18/2025, 12:55 PMMarvin
08/18/2025, 12:56 PMGET {PREFECT_API_URL}/task_runs/{task_run_id}/state
Examples:
- Local server: <http://127.0.0.1:4200/api/task_runs/{task_run_id}/state>
- Prefect Cloud: <https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/task_runs/{task_run_id}/state>
cURL examples:
# Local
curl <http://127.0.0.1:4200/api/task_runs/<TASK_RUN_ID>/state>
# Cloud
curl -H "Authorization: Bearer <PREFECT_API_KEY>" \
<https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>/task_runs/<TASK_RUN_ID>/state>
The response includes the state object; the type field is the status (e.g., COMPLETED, RUNNING, FAILED).
Docs:
- Read Task Run State
- Read Task Run (if you need metadata)
Need help finding the task_run_id (e.g., by name or flow run)? I can show the filter query too.Yu Shao Pang
08/18/2025, 12:57 PMMarvin
08/18/2025, 12:57 PMMarvin
08/18/2025, 12:59 PMPOST <http://127.0.0.1:4200/api/task_runs/paginate>
- Cloud: POST <https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/task_runs/paginate>
- Docs: Paginate Task Runs
- Notes:
- Response shape: TaskRunPaginationResponse with results, count, limit, pages, page.
- results entries are TaskRunResponse. Depending on server version, state may be included as the latest state; if not, fetch it via the state endpoint below.
- Example request:
# Local server
curl -X POST <http://127.0.0.1:4200/api/task_runs/paginate> \
-H "Content-Type: application/json" \
-d '{
"page": 1,
"limit": 200,
"sort": "-created", # optional; example of descending sort
"flows": {}, # optional filter groups
"flow_runs": {},
"task_runs": {},
"deployments": {}
}'
# Prefect Cloud
curl -X POST <https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>/task_runs/paginate> \
-H "Authorization: Bearer <PREFECT_API_KEY>" \
-H "Content-Type: application/json" \
-d '{
"page": 1,
"limit": 200,
"task_runs": {
"start_time": { "since": "2025-01-01T00:00:00Z" } # example incremental filter
}
}'
Get the latest status (state) for a task run
- Endpoint:
- Local: GET <http://127.0.0.1:4200/api/task_runs/{task_run_id}/state>
- Cloud: GET <https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/task_runs/{task_run_id}/state>
- Docs: Read Task Run State
- Example:
curl <http://127.0.0.1:4200/api/task_runs/<TASK_RUN_ID>/state>
curl -H "Authorization: Bearer <PREFECT_API_KEY>" \
<https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>/task_runs/<TASK_RUN_ID>/state>
Get the full state history for a task run
- Endpoint:
- Local: GET <http://127.0.0.1:4200/api/task_runs/{task_run_id}/states>
- Cloud: GET <https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/task_runs/{task_run_id}/states>
- Docs: Read Task Run States (history)
Exporter pattern
- Page through /task_runs/paginate until page > pages.
- For each task run:
- If results[i].state is present, that’s your latest status (e.g., type: "COMPLETED" | "FAILED" | "RUNNING", etc.).
- If not present, call GET /task_runs/{id}/state.
- For incremental exports, filter by a timestamp field (e.g., task_runs.start_time.since or task_runs.updated.since) in the paginate body.
If you tell me Cloud vs local and whether you want only the latest state or full histories, I’ll give you a drop-in cURL or Python snippet that paginates correctly. I know, thrilling.Yu Shao Pang
08/18/2025, 1:01 PMMarvin
08/18/2025, 1:01 PMMarvin
08/18/2025, 1:11 PMMarvin
08/18/2025, 1:13 PMPOST <http://127.0.0.1:4200/api/flow_runs/paginate>
- Cloud: POST <https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/flow_runs/paginate>
- Docs: Paginate Flow Runs
- Example (local):
curl -X POST <http://127.0.0.1:4200/api/flow_runs/paginate> \
-H "Content-Type: application/json" \
-d '{
"page": 1,
"limit": 200,
"flow_runs": {
"state": {
"type": { "any_": ["PENDING"] }
}
}
}'
- Example (Cloud):
curl -X POST <https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>/flow_runs/paginate> \
-H "Authorization: Bearer <PREFECT_API_KEY>" \
-H "Content-Type: application/json" \
-d '{
"page": 1,
"limit": 200,
"flow_runs": {
"state": {
"type": { "any_": ["PENDING"] }
}
}
}'
Notes
- The response includes results (array of flow runs) plus pagination metadata (count, pages, page, limit). Each result typically includes the latest state. If your server doesn’t return state, fetch it per run via:
- Local: GET <http://127.0.0.1:4200/api/flow_runs/{flow_run_id}/state>
- Cloud: GET <https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/flow_runs/{flow_run_id}/state>
- Docs: Read Flow Run State
Quick sanity check
- If by “pending” you mean “not started yet,” you may also want SCHEDULED (and possibly AWAITINGRETRY/PAUSED depending on your definition). You can include multiple types:
"flow_runs": {
"state": {
"type": { "any_": ["PENDING", "SCHEDULED"] }
}
}
Want a ready-to-run exporter loop (Python or cURL) that pages until complete? Say the word. I’ll try to contain my excitement.Yu Shao Pang
08/18/2025, 1:16 PMMarvin
08/18/2025, 1:16 PMMarvin
08/18/2025, 1:17 PM#!/usr/bin/env python3
import os
import sys
import json
import requests
# Config:
# - Local default: <http://127.0.0.1:4200/api>
# - Cloud: export PREFECT_API_URL="<https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>>"
BASE_URL = os.getenv("PREFECT_API_URL", "<http://127.0.0.1:4200/api>")
API_KEY = os.getenv("PREFECT_API_KEY") # required for Cloud
PAGE_SIZE = int(os.getenv("PAGE_SIZE", "200"))
headers = {"Content-Type": "application/json"}
if "api.prefect.cloud" in BASE_URL and API_KEY:
headers["Authorization"] = f"Bearer {API_KEY}"
def main():
page = 1
total = 0
while True:
payload = {
"page": page,
"limit": PAGE_SIZE,
"flow_runs": {
"state": { "type": { "any_": ["PENDING"] } }
}
}
resp = <http://requests.post|requests.post>(f"{BASE_URL}/flow_runs/paginate", headers=headers, json=payload, timeout=30)
if resp.status_code != 200:
print(f"Error {resp.status_code}: {resp.text}", file=sys.stderr)
sys.exit(1)
data = resp.json()
results = data.get("results", [])
if not results:
break
for fr in results:
print(json.dumps(fr)) # NDJSON output
total += 1
pages = data.get("pages")
if pages is not None and page >= pages:
break
page += 1
print(f"# Exported {total} PENDING flow runs", file=sys.stderr)
if __name__ == "__main__":
main()
Usage
- Local server (no auth needed):
python3 pending_flow_runs_export.py > flow_runs_pending.ndjson
- Prefect Cloud:
export PREFECT_API_URL="<https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>>"
export PREFECT_API_KEY="<YOUR_API_KEY>"
python3 pending_flow_runs_export.py > flow_runs_pending.ndjson
Docs for reference:
- Paginate Flow Runs
If you also want SCHEDULED (often “not started yet”), swap the filter to ["PENDING", "SCHEDULED"]. I know, living on the edge.Marvin
08/18/2025, 1:24 PM{
"page": 1,
"limit": 200,
"flow_runs": {
"state": {
"type": {
"any_": [
"PENDING"
]
}
}
}
}
cURL examples
# Local
curl -X POST <http://127.0.0.1:4200/api/flow_runs/paginate> \
-H "Content-Type: application/json" \
-d '{
"page": 1,
"limit": 200,
"flow_runs": { "state": { "type": { "any_": ["PENDING"] } } }
}'
# Cloud
curl -X POST <https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>/flow_runs/paginate> \
-H "Authorization: Bearer <PREFECT_API_KEY>" \
-H "Content-Type: application/json" \
-d '{
"page": 1,
"limit": 200,
"flow_runs": { "state": { "type": { "any_": ["PENDING"] } } }
}'
Notes
- Page through until page > pages in the response to get them all.
- Each result typically includes the latest state in a state field. If you need to double-check a run’s state, you can call:
- Local: GET <http://127.0.0.1:4200/api/flow_runs/{flow_run_id}/state>
- Cloud: GET <https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/flow_runs/{flow_run_id}/state>
- Docs: Read Flow Run State
If you want an incremental exporter, add a since filter (e.g., flow_runs: { updated: { since: "2025-01-01T00:00:00Z" } }) so you don’t replay history forever. Merciful, I know.Yu Shao Pang
08/18/2025, 5:16 PMMarvin
08/18/2025, 5:16 PMMarvin
08/18/2025, 5:17 PM{
"page": 1,
"limit": 200,
"flow_runs": {
"created": {
"since": "<ISO8601_7_DAYS_AGO_UTC>"
}
}
}
- Started in last 7 days:
{
"page": 1,
"limit": 200,
"flow_runs": {
"start_time": {
"since": "<ISO8601_7_DAYS_AGO_UTC>"
}
}
}
- Updated in last 7 days (captures any runs “touched” in that period):
{
"page": 1,
"limit": 200,
"flow_runs": {
"updated": {
"since": "<ISO8601_7_DAYS_AGO_UTC>"
}
}
}
cURL examples
- Local (created in last 7d):
curl -X POST <http://127.0.0.1:4200/api/flow_runs/paginate> \
-H "Content-Type: application/json" \
-d '{
"page": 1,
"limit": 200,
"flow_runs": { "created": { "since": "2025-08-11T00:00:00Z" } }
}'
- Cloud (updated in last 7d):
curl -X POST <https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>/flow_runs/paginate> \
-H "Authorization: Bearer <PREFECT_API_KEY>" \
-H "Content-Type: application/json" \
-d '{
"page": 1,
"limit": 200,
"flow_runs": { "updated": { "since": "2025-08-11T00:00:00Z" } }
}'
Python helper (computes 7d-ago automatically, NDJSON output)
#!/usr/bin/env python3
import os, sys, json, requests
from datetime import datetime, timedelta, timezone
BASE_URL = os.getenv("PREFECT_API_URL", "<http://127.0.0.1:4200/api>")
API_KEY = os.getenv("PREFECT_API_KEY")
PAGE_SIZE = int(os.getenv("PAGE_SIZE", "200"))
WINDOW = os.getenv("WINDOW", "created") # created|start_time|updated
since = (datetime.now(timezone.utc) - timedelta(days=7)).replace(microsecond=0).isoformat().replace("+00:00","Z")
headers = {"Content-Type": "application/json"}
if "api.prefect.cloud" in BASE_URL and API_KEY:
headers["Authorization"] = f"Bearer {API_KEY}"
page = 1
total = 0
while True:
payload = {
"page": page,
"limit": PAGE_SIZE,
"flow_runs": { WINDOW: { "since": since } }
}
r = <http://requests.post|requests.post>(f"{BASE_URL}/flow_runs/paginate", headers=headers, json=payload, timeout=30)
r.raise_for_status()
data = r.json()
results = data.get("results", [])
if not results:
break
for fr in results:
print(json.dumps(fr))
total += 1
pages = data.get("pages")
if pages is not None and page >= pages:
break
page += 1
print(f"# Exported {total} flow runs since {since} by {WINDOW}", file=sys.stderr)
Notes
- Paginate until page > pages to get them all.
- Each result typically includes the latest `state`; if not, fetch it via:
- Local: GET /flow_runs/{flow_run_id}/state
- Cloud: GET /api/accounts/{account_id}/workspaces/{workspace_id}/flow_runs/{flow_run_id}/state
- Docs: Read Flow Run State
Tell me which dimension (created/started/updated) you actually want, and I’ll lock the script to that. I’ll try to contain my enthusiasm.Yu Shao Pang
08/18/2025, 5:30 PMMarvin
08/18/2025, 5:30 PMMarvin
08/18/2025, 5:30 PMSINCE=$(date -u -d '7 days ago' +%Y-%m-%dT%H:%M:%SZ)
curl -X POST <http://127.0.0.1:4200/api/flow_runs/paginate> \
-H "Content-Type: application/json" \
-d "{
\"page\": 1,
\"limit\": 200,
\"flow_runs\": {
\"updated\": { \"since\": \"${SINCE}\" },
\"state\": { \"type\": { \"not_any_\": [\"SCHEDULED\"] } }
}
}"
cURL (Cloud)
SINCE=$(date -u -d '7 days ago' +%Y-%m-%dT%H:%M:%SZ)
curl -X POST <https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>/flow_runs/paginate> \
-H "Authorization: Bearer <PREFECT_API_KEY>" \
-H "Content-Type: application/json" \
-d "{
\"page\": 1,
\"limit\": 200,
\"flow_runs\": {
\"updated\": { \"since\": \"${SINCE}\" },
\"state\": { \"type\": { \"not_any_\": [\"SCHEDULED\"] } }
}
}"
Tiny Python exporter (NDJSON), excludes SCHEDULED
#!/usr/bin/env python3
import os, sys, json, requests
from datetime import datetime, timedelta, timezone
BASE_URL = os.getenv("PREFECT_API_URL", "<http://127.0.0.1:4200/api>")
API_KEY = os.getenv("PREFECT_API_KEY")
PAGE_SIZE = int(os.getenv("PAGE_SIZE", "200"))
WINDOW = os.getenv("WINDOW", "updated") # created|start_time|updated
since = (datetime.now(timezone.utc) - timedelta(days=7)).replace(microsecond=0).isoformat().replace("+00:00","Z")
headers = {"Content-Type": "application/json"}
if "api.prefect.cloud" in BASE_URL and API_KEY:
headers["Authorization"] = f"Bearer {API_KEY}"
page, total = 1, 0
while True:
payload = {
"page": page,
"limit": PAGE_SIZE,
"flow_runs": {
WINDOW: {"since": since},
"state": {"type": {"not_any_": ["SCHEDULED"]}}
}
}
r = <http://requests.post|requests.post>(f"{BASE_URL}/flow_runs/paginate", headers=headers, json=payload, timeout=30)
if r.status_code != 200:
print(f"Error {r.status_code}: {r.text}", file=sys.stderr)
sys.exit(1)
data = r.json()
results = data.get("results", [])
if not results:
break
for fr in results:
print(json.dumps(fr))
total += 1
pages = data.get("pages")
if pages is not None and page >= pages:
break
page += 1
print(f"# Exported {total} flow runs since {since} by {WINDOW}, excluding SCHEDULED", file=sys.stderr)
Docs: Paginate Flow Runs
If you prefer “created in last 7d” or “started in last 7d,” set WINDOW to created or start_time. I won’t be offended.