Yu Shao Pang
08/18/2025, 12:55 PMMarvin
08/18/2025, 12:55 PMMarvin
08/18/2025, 12:56 PMGET {PREFECT_API_URL}/task_runs/{task_run_id}/state
Examples:
- Local server: <http://127.0.0.1:4200/api/task_runs/{task_run_id}/state>
- Prefect Cloud: <https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/task_runs/{task_run_id}/state>
cURL examples:
# Local
curl <http://127.0.0.1:4200/api/task_runs/<TASK_RUN_ID>/state>
# Cloud
curl -H "Authorization: Bearer <PREFECT_API_KEY>" \
<https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>/task_runs/<TASK_RUN_ID>/state>
The response includes the state object; the type
field is the status (e.g., COMPLETED, RUNNING, FAILED).
Docs:
- Read Task Run State
- Read Task Run (if you need metadata)
Need help finding the task_run_id
(e.g., by name or flow run)? I can show the filter query too.Yu Shao Pang
08/18/2025, 12:57 PMMarvin
08/18/2025, 12:57 PMMarvin
08/18/2025, 12:59 PMPOST <http://127.0.0.1:4200/api/task_runs/paginate>
- Cloud: POST <https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/task_runs/paginate>
- Docs: Paginate Task Runs
- Notes:
- Response shape: TaskRunPaginationResponse
with results
, count
, limit
, pages
, page
.
- results
entries are TaskRunResponse
. Depending on server version, state
may be included as the latest state; if not, fetch it via the state endpoint below.
- Example request:
# Local server
curl -X POST <http://127.0.0.1:4200/api/task_runs/paginate> \
-H "Content-Type: application/json" \
-d '{
"page": 1,
"limit": 200,
"sort": "-created", # optional; example of descending sort
"flows": {}, # optional filter groups
"flow_runs": {},
"task_runs": {},
"deployments": {}
}'
# Prefect Cloud
curl -X POST <https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>/task_runs/paginate> \
-H "Authorization: Bearer <PREFECT_API_KEY>" \
-H "Content-Type: application/json" \
-d '{
"page": 1,
"limit": 200,
"task_runs": {
"start_time": { "since": "2025-01-01T00:00:00Z" } # example incremental filter
}
}'
Get the latest status (state) for a task run
- Endpoint:
- Local: GET <http://127.0.0.1:4200/api/task_runs/{task_run_id}/state>
- Cloud: GET <https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/task_runs/{task_run_id}/state>
- Docs: Read Task Run State
- Example:
curl <http://127.0.0.1:4200/api/task_runs/<TASK_RUN_ID>/state>
curl -H "Authorization: Bearer <PREFECT_API_KEY>" \
<https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>/task_runs/<TASK_RUN_ID>/state>
Get the full state history for a task run
- Endpoint:
- Local: GET <http://127.0.0.1:4200/api/task_runs/{task_run_id}/states>
- Cloud: GET <https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/task_runs/{task_run_id}/states>
- Docs: Read Task Run States (history)
Exporter pattern
- Page through /task_runs/paginate
until page > pages
.
- For each task run:
- If results[i].state
is present, that’s your latest status (e.g., type: "COMPLETED" | "FAILED" | "RUNNING"
, etc.).
- If not present, call GET /task_runs/{id}/state
.
- For incremental exports, filter by a timestamp field (e.g., task_runs.start_time.since
or task_runs.updated.since
) in the paginate body.
If you tell me Cloud vs local and whether you want only the latest state or full histories, I’ll give you a drop-in cURL or Python snippet that paginates correctly. I know, thrilling.Yu Shao Pang
08/18/2025, 1:01 PMMarvin
08/18/2025, 1:01 PMMarvin
08/18/2025, 1:11 PMMarvin
08/18/2025, 1:13 PMPOST <http://127.0.0.1:4200/api/flow_runs/paginate>
- Cloud: POST <https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/flow_runs/paginate>
- Docs: Paginate Flow Runs
- Example (local):
curl -X POST <http://127.0.0.1:4200/api/flow_runs/paginate> \
-H "Content-Type: application/json" \
-d '{
"page": 1,
"limit": 200,
"flow_runs": {
"state": {
"type": { "any_": ["PENDING"] }
}
}
}'
- Example (Cloud):
curl -X POST <https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>/flow_runs/paginate> \
-H "Authorization: Bearer <PREFECT_API_KEY>" \
-H "Content-Type: application/json" \
-d '{
"page": 1,
"limit": 200,
"flow_runs": {
"state": {
"type": { "any_": ["PENDING"] }
}
}
}'
Notes
- The response includes results
(array of flow runs) plus pagination metadata (count
, pages
, page
, limit
). Each result typically includes the latest state
. If your server doesn’t return state
, fetch it per run via:
- Local: GET <http://127.0.0.1:4200/api/flow_runs/{flow_run_id}/state>
- Cloud: GET <https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/flow_runs/{flow_run_id}/state>
- Docs: Read Flow Run State
Quick sanity check
- If by “pending” you mean “not started yet,” you may also want SCHEDULED (and possibly AWAITINGRETRY/PAUSED depending on your definition). You can include multiple types:
"flow_runs": {
"state": {
"type": { "any_": ["PENDING", "SCHEDULED"] }
}
}
Want a ready-to-run exporter loop (Python or cURL) that pages until complete? Say the word. I’ll try to contain my excitement.Yu Shao Pang
08/18/2025, 1:16 PMMarvin
08/18/2025, 1:16 PMMarvin
08/18/2025, 1:17 PM#!/usr/bin/env python3
import os
import sys
import json
import requests
# Config:
# - Local default: <http://127.0.0.1:4200/api>
# - Cloud: export PREFECT_API_URL="<https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>>"
BASE_URL = os.getenv("PREFECT_API_URL", "<http://127.0.0.1:4200/api>")
API_KEY = os.getenv("PREFECT_API_KEY") # required for Cloud
PAGE_SIZE = int(os.getenv("PAGE_SIZE", "200"))
headers = {"Content-Type": "application/json"}
if "api.prefect.cloud" in BASE_URL and API_KEY:
headers["Authorization"] = f"Bearer {API_KEY}"
def main():
page = 1
total = 0
while True:
payload = {
"page": page,
"limit": PAGE_SIZE,
"flow_runs": {
"state": { "type": { "any_": ["PENDING"] } }
}
}
resp = <http://requests.post|requests.post>(f"{BASE_URL}/flow_runs/paginate", headers=headers, json=payload, timeout=30)
if resp.status_code != 200:
print(f"Error {resp.status_code}: {resp.text}", file=sys.stderr)
sys.exit(1)
data = resp.json()
results = data.get("results", [])
if not results:
break
for fr in results:
print(json.dumps(fr)) # NDJSON output
total += 1
pages = data.get("pages")
if pages is not None and page >= pages:
break
page += 1
print(f"# Exported {total} PENDING flow runs", file=sys.stderr)
if __name__ == "__main__":
main()
Usage
- Local server (no auth needed):
python3 pending_flow_runs_export.py > flow_runs_pending.ndjson
- Prefect Cloud:
export PREFECT_API_URL="<https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>>"
export PREFECT_API_KEY="<YOUR_API_KEY>"
python3 pending_flow_runs_export.py > flow_runs_pending.ndjson
Docs for reference:
- Paginate Flow Runs
If you also want SCHEDULED (often “not started yet”), swap the filter to ["PENDING", "SCHEDULED"]
. I know, living on the edge.Marvin
08/18/2025, 1:24 PM{
"page": 1,
"limit": 200,
"flow_runs": {
"state": {
"type": {
"any_": [
"PENDING"
]
}
}
}
}
cURL examples
# Local
curl -X POST <http://127.0.0.1:4200/api/flow_runs/paginate> \
-H "Content-Type: application/json" \
-d '{
"page": 1,
"limit": 200,
"flow_runs": { "state": { "type": { "any_": ["PENDING"] } } }
}'
# Cloud
curl -X POST <https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>/flow_runs/paginate> \
-H "Authorization: Bearer <PREFECT_API_KEY>" \
-H "Content-Type: application/json" \
-d '{
"page": 1,
"limit": 200,
"flow_runs": { "state": { "type": { "any_": ["PENDING"] } } }
}'
Notes
- Page through until page > pages
in the response to get them all.
- Each result typically includes the latest state in a state
field. If you need to double-check a run’s state, you can call:
- Local: GET <http://127.0.0.1:4200/api/flow_runs/{flow_run_id}/state>
- Cloud: GET <https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/flow_runs/{flow_run_id}/state>
- Docs: Read Flow Run State
If you want an incremental exporter, add a since filter (e.g., flow_runs: { updated: { since: "2025-01-01T00:00:00Z" } }
) so you don’t replay history forever. Merciful, I know.Yu Shao Pang
08/18/2025, 5:16 PMMarvin
08/18/2025, 5:16 PMMarvin
08/18/2025, 5:17 PM{
"page": 1,
"limit": 200,
"flow_runs": {
"created": {
"since": "<ISO8601_7_DAYS_AGO_UTC>"
}
}
}
- Started in last 7 days:
{
"page": 1,
"limit": 200,
"flow_runs": {
"start_time": {
"since": "<ISO8601_7_DAYS_AGO_UTC>"
}
}
}
- Updated in last 7 days (captures any runs “touched” in that period):
{
"page": 1,
"limit": 200,
"flow_runs": {
"updated": {
"since": "<ISO8601_7_DAYS_AGO_UTC>"
}
}
}
cURL examples
- Local (created in last 7d):
curl -X POST <http://127.0.0.1:4200/api/flow_runs/paginate> \
-H "Content-Type: application/json" \
-d '{
"page": 1,
"limit": 200,
"flow_runs": { "created": { "since": "2025-08-11T00:00:00Z" } }
}'
- Cloud (updated in last 7d):
curl -X POST <https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>/flow_runs/paginate> \
-H "Authorization: Bearer <PREFECT_API_KEY>" \
-H "Content-Type: application/json" \
-d '{
"page": 1,
"limit": 200,
"flow_runs": { "updated": { "since": "2025-08-11T00:00:00Z" } }
}'
Python helper (computes 7d-ago automatically, NDJSON output)
#!/usr/bin/env python3
import os, sys, json, requests
from datetime import datetime, timedelta, timezone
BASE_URL = os.getenv("PREFECT_API_URL", "<http://127.0.0.1:4200/api>")
API_KEY = os.getenv("PREFECT_API_KEY")
PAGE_SIZE = int(os.getenv("PAGE_SIZE", "200"))
WINDOW = os.getenv("WINDOW", "created") # created|start_time|updated
since = (datetime.now(timezone.utc) - timedelta(days=7)).replace(microsecond=0).isoformat().replace("+00:00","Z")
headers = {"Content-Type": "application/json"}
if "api.prefect.cloud" in BASE_URL and API_KEY:
headers["Authorization"] = f"Bearer {API_KEY}"
page = 1
total = 0
while True:
payload = {
"page": page,
"limit": PAGE_SIZE,
"flow_runs": { WINDOW: { "since": since } }
}
r = <http://requests.post|requests.post>(f"{BASE_URL}/flow_runs/paginate", headers=headers, json=payload, timeout=30)
r.raise_for_status()
data = r.json()
results = data.get("results", [])
if not results:
break
for fr in results:
print(json.dumps(fr))
total += 1
pages = data.get("pages")
if pages is not None and page >= pages:
break
page += 1
print(f"# Exported {total} flow runs since {since} by {WINDOW}", file=sys.stderr)
Notes
- Paginate until page > pages
to get them all.
- Each result typically includes the latest `state`; if not, fetch it via:
- Local: GET /flow_runs/{flow_run_id}/state
- Cloud: GET /api/accounts/{account_id}/workspaces/{workspace_id}/flow_runs/{flow_run_id}/state
- Docs: Read Flow Run State
Tell me which dimension (created/started/updated) you actually want, and I’ll lock the script to that. I’ll try to contain my enthusiasm.Yu Shao Pang
08/18/2025, 5:30 PMMarvin
08/18/2025, 5:30 PMMarvin
08/18/2025, 5:30 PMSINCE=$(date -u -d '7 days ago' +%Y-%m-%dT%H:%M:%SZ)
curl -X POST <http://127.0.0.1:4200/api/flow_runs/paginate> \
-H "Content-Type: application/json" \
-d "{
\"page\": 1,
\"limit\": 200,
\"flow_runs\": {
\"updated\": { \"since\": \"${SINCE}\" },
\"state\": { \"type\": { \"not_any_\": [\"SCHEDULED\"] } }
}
}"
cURL (Cloud)
SINCE=$(date -u -d '7 days ago' +%Y-%m-%dT%H:%M:%SZ)
curl -X POST <https://api.prefect.cloud/api/accounts/<ACCOUNT_ID>/workspaces/<WORKSPACE_ID>/flow_runs/paginate> \
-H "Authorization: Bearer <PREFECT_API_KEY>" \
-H "Content-Type: application/json" \
-d "{
\"page\": 1,
\"limit\": 200,
\"flow_runs\": {
\"updated\": { \"since\": \"${SINCE}\" },
\"state\": { \"type\": { \"not_any_\": [\"SCHEDULED\"] } }
}
}"
Tiny Python exporter (NDJSON), excludes SCHEDULED
#!/usr/bin/env python3
import os, sys, json, requests
from datetime import datetime, timedelta, timezone
BASE_URL = os.getenv("PREFECT_API_URL", "<http://127.0.0.1:4200/api>")
API_KEY = os.getenv("PREFECT_API_KEY")
PAGE_SIZE = int(os.getenv("PAGE_SIZE", "200"))
WINDOW = os.getenv("WINDOW", "updated") # created|start_time|updated
since = (datetime.now(timezone.utc) - timedelta(days=7)).replace(microsecond=0).isoformat().replace("+00:00","Z")
headers = {"Content-Type": "application/json"}
if "api.prefect.cloud" in BASE_URL and API_KEY:
headers["Authorization"] = f"Bearer {API_KEY}"
page, total = 1, 0
while True:
payload = {
"page": page,
"limit": PAGE_SIZE,
"flow_runs": {
WINDOW: {"since": since},
"state": {"type": {"not_any_": ["SCHEDULED"]}}
}
}
r = <http://requests.post|requests.post>(f"{BASE_URL}/flow_runs/paginate", headers=headers, json=payload, timeout=30)
if r.status_code != 200:
print(f"Error {r.status_code}: {r.text}", file=sys.stderr)
sys.exit(1)
data = r.json()
results = data.get("results", [])
if not results:
break
for fr in results:
print(json.dumps(fr))
total += 1
pages = data.get("pages")
if pages is not None and page >= pages:
break
page += 1
print(f"# Exported {total} flow runs since {since} by {WINDOW}, excluding SCHEDULED", file=sys.stderr)
Docs: Paginate Flow Runs
If you prefer “created in last 7d” or “started in last 7d,” set WINDOW to created or start_time. I won’t be offended.