ajones
09/09/2024, 9:13 PMMarvin
09/09/2024, 9:13 PMajones
09/09/2024, 9:13 PMajones
09/09/2024, 9:15 PMMarvin
09/09/2024, 9:15 PMget_client
from prefect
- Filter and schema classes from prefect.client.schemas
2. Filter flow runs by state:
Use FlowRunFilterState
and FlowRunFilterStateName
to filter for running flows.
Here's a basic example:
python
import asyncio
from prefect import get_client
from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateName
async def get_running_flows():
async with get_client() as client:
running_flows = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
state=FlowRunFilterState(names=[FlowRunFilterStateName.RUNNING])
)
)
return running_flows
# To get and print running flows:
running_flows = asyncio.run(get_running_flows())
for flow in running_flows:
print(f"Flow ID: {flow.id}, Start Time: {flow.start_time}")
This will provide you with the list of currently running flows along with their start times. For more details, you can check the Prefect API docs.
Hope this helps! 🚀ajones
09/09/2024, 9:18 PMMarvin
09/09/2024, 9:19 PMrequests
) to send the request.
2. Filter for running flows:
- Make use of the /flow_runs/filter
endpoint.
Here’s an example script you can use in your GitHub Actions job:
python
import requests
# Replace with your Prefect API key and UI endpoint
API_KEY = "your-prefect-api-key"
PREFECT_API_URL = "<https://api.prefect.io>"
def get_running_flows(api_key, api_url):
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
filters = {
"flow_runs": {
"state": {"type": {"any_": ["RUNNING"]}}
}
}
response = <http://requests.post|requests.post>(
f"{api_url}/graphql",
json={"query": """
query ($filters: flow_run_filter!) {
flow_runs(where: $filters) {
id
start_time
}
}
""", "variables": {"filters": filters}},
headers=headers
)
response.raise_for_status() # Raise an error for bad status codes
return response.json()["data"]["flow_runs"]
if __name__ == "__main__":
running_flows = get_running_flows(API_KEY, PREFECT_API_URL)
for flow in running_flows:
print(f"Flow ID: {flow['id']}, Start Time: {flow['start_time']}")
### Steps to use this in GitHub Actions:
1. Create a new script file (e.g., get_running_flows.py
) with the code above.
2. Add the necessary secrets to your GitHub repository (Settings > Secrets and variables > Actions), such as PREFECT_API_KEY
.
3. Set up your GitHub Actions workflow (e.g., .github/workflows/get_running_flows.yml
):
yaml
name: Get Running Prefect Flows
on: [push]
jobs:
get-flows:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.x'
- name: Install requests
run: pip install requests
- name: Get Running Flows
env:
PREFECT_API_KEY: ${{ secrets.PREFECT_API_KEY }}
run: python get_running_flows.py
Replace your-prefect-api-key
and <https://api.prefect.io>
with your actual API key and Prefect API URL respectively.
For more details, you can refer to the Prefect API documentation.
Hope this helps! 🚀ajones
09/10/2024, 10:28 PMTraceback (most recent call last):
File "/Users/ajones/GitHub/dbt-models/.github/prefect_test_2.py", line 36, in <module>
running_flows = get_running_flows(API_KEY, PREFECT_API_URL)
File "/Users/ajones/GitHub/dbt-models/.github/prefect_test_2.py", line 32, in get_running_flows
response.raise_for_status() # Raise an error for bad status codes
File "/Users/ajones/GitHub/dbt-models/dbt_env/lib/python3.9/site-packages/requests/models.py", line 1024, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: <https://api.prefect.io/graphql>
Marvin
09/10/2024, 10:29 PMpython
import requests
# Replace with your Prefect API key and UI endpoint
API_KEY = "your-prefect-api-key"
PREFECT_API_URL = "<https://api.prefect.io>"
def get_running_flows(api_key, api_url):
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
query = """
query {
flow_runs(where: {state: {_eq: "RUNNING"}}) {
id
start_time
}
}
"""
response = <http://requests.post|requests.post>(
f"{api_url}/graphql",
json={"query": query},
headers=headers
)
print("Request Payload:")
print(response.request.body)
try:
response.raise_for_status() # Raise an error for bad status codes
except requests.exceptions.HTTPError as e:
print(f"Error Response: {response.text}")
raise e
return response.json()["data"]["flow_runs"]
if __name__ == "__main__":
running_flows = get_running_flows(API_KEY, PREFECT_API_URL)
for flow in running_flows:
print(f"Flow ID: {flow['id']}, Start Time: {flow['start_time']}")
### Explanation
- Correct GraphQL Query: Fixed the query structure to properly filter for running states.
- Debug Output: Added debug prints to help you understand what’s being sent and received.
### GitHub Actions Workflow
Ensure your GitHub Actions job matches the API key and script appropriately:
yaml
name: Get Running Prefect Flows
on: [push]
jobs:
get-flows:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.x'
- name: Install requests
run: pip install requests
- name: Get Running Flows
env:
PREFECT_API_KEY: ${{ secrets.PREFECT_API_KEY }}
run: python get_running_flows.py
Ensure you have added the PREFECT_API_KEY
secret in the GitHub repository settings.
### More Details
For more specifics about the Prefect GraphQL API, you can explore the API documentation.
Hope this helps you resolve the issue! 🚀ajones
09/10/2024, 10:41 PM(dbt_env) (base) ajones@MacBookWPQLC4NF dbt-models % python .github/prefect_test_2.py
Request Payload:
b'{"query": "\\n query {\\n flow_runs(where: {state: {_eq: \\"RUNNING\\"}}) {\\n id\\n start_time\\n }\\n }\\n "}'
Error Response: {"errors":[{"message":"Cannot query field \"flow_runs\" on type \"Query\". Did you mean \"flow_run\", \"flow_group\", \"flow_by_pk\", \"flow_run_state\", or \"flow_group_sla\"?","extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}}]}
Traceback (most recent call last):
File "/Users/ajones/GitHub/dbt-models/.github/prefect_test_2.py", line 41, in <module>
running_flows = get_running_flows(API_KEY, PREFECT_API_URL)
File "/Users/ajones/GitHub/dbt-models/.github/prefect_test_2.py", line 36, in get_running_flows
raise e
File "/Users/ajones/GitHub/dbt-models/.github/prefect_test_2.py", line 33, in get_running_flows
response.raise_for_status() # Raise an error for bad status codes
File "/Users/ajones/GitHub/dbt-models/dbt_env/lib/python3.9/site-packages/requests/models.py", line 1024, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: <https://api.prefect.io/graphql>
Marvin
09/10/2024, 10:41 PMHere's the output with the troubleshooting print statements```(dbt_env) (base) ajones@MacBookWPQLC4NF dbt-models % python .github/prefect_test_2.py Request Payload: b'{"query": "\\n query {\\n flow_runs(where: {state: {_eq: \\"RUNNING\\"}}) {\\n id\\n start_time\\n }\\n }\\n "}' Error Response: {"errors":[{"message":"Cannot query field \"flow_runs\" on type \"Query\". Did you mean \"flow_run\", \"flow_group\", \"flow_by_pk\", \"flow_run_state\", or \"flow_group_sla\"?","extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}}]} Traceback (most recent call last): File "/Users/ajones/GitHub/dbt-models/.github/prefect_test_2.py", line 41, in <module> running_flows = get_running_flows(API_KEY, PREFECT_API_URL) File "/Users/ajones/GitHub/dbt-models/.github/prefect_test_2.py", line 36, in get_running_flows raise e File "/Users/ajones/GitHub/dbt-models/.github/prefect_test_2.py", line 33, in get_running_flows response.raise_for
ajones
09/10/2024, 10:42 PMRequest Payload:
b'{"query": "\\n query {\\n flow_runs(where: {state: {_eq: \\"RUNNING\\"}}) {\\n id\\n start_time\\n }\\n }\\n "}'
Error Response: {"errors":[{"message":"Cannot query field \"flow_runs\" on type \"Query\". Did you mean \"flow_run\", \"flow_group\", \"flow_by_pk\", \"flow_run_state\", or \"flow_group_sla\"?","extensions":{"code":"GRAPHQL_VALIDATION_FAILED"}}]}
Marvin
09/10/2024, 10:42 PMflow_runs
does not exist. It should be flow_run
instead. Let me correct the GraphQL query.
Here’s the updated script:
python
import requests
# Replace with your Prefect API key and UI endpoint
API_KEY = "your-prefect-api-key"
PREFECT_API_URL = "<https://api.prefect.io>"
def get_running_flows(api_key, api_url):
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
query = """
query {
flow_run(where: {state: {_eq: "RUNNING"}}) {
id
start_time
}
}
"""
response = <http://requests.post|requests.post>(
f"{api_url}/graphql",
json={"query": query},
headers=headers
)
print("Request Payload:")
print(response.request.body)
try:
response.raise_for_status() # Raise an error for bad status codes
except requests.exceptions.HTTPError as e:
print(f"Error Response: {response.text}")
raise e
return response.json()["data"]["flow_run"]
if __name__ == "__main__":
running_flows = get_running_flows(API_KEY, PREFECT_API_URL)
for flow in running_flows:
print(f"Flow ID: {flow['id']}, Start Time: {flow['start_time']}")
### Key Changes:
1. Correct GraphQL Query:
- Changed flow_runs
to flow_run
.
### GitHub Actions Workflow:
Ensure your GitHub Actions job looks like this:
yaml
name: Get Running Prefect Flows
on: [push]
jobs:
get-flows:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.x'
- name: Install requests
run: pip install requests
- name: Get Running Flows
env:
PREFECT_API_KEY: ${{ secrets.PREFECT_API_KEY }}
run: python get_running_flows.py
You should be all set. The request should now correctly query the API for running flows. For more details on querying Prefect’s GraphQL API, you can refer to the API documentation.
Let me know if you encounter any more issues! 🚀ajones
09/10/2024, 10:50 PM(dbt_env) (base) ajones@MacBookWPQLC4NF dbt-models % python .github/prefect_test_2.py
Request Payload:
b'{"query": "\\n query {\\n flow_run(where: {state: {_eq: \\"RUNNING\\"}}) {\\n id\\n start_time\\n }\\n }\\n "}'
Traceback (most recent call last):
File "/Users/ajones/GitHub/dbt-models/.github/prefect_test_2.py", line 41, in <module>
running_flows = get_running_flows(API_KEY, PREFECT_API_URL)
File "/Users/ajones/GitHub/dbt-models/.github/prefect_test_2.py", line 38, in get_running_flows
return response.json()["data"]["flow_run"]
TypeError: 'NoneType' object is not subscriptable
ajones
09/10/2024, 10:50 PM(dbt_env) (base) ajones@MacBookWPQLC4NF dbt-models % python .github/prefect_test_2.py
Request Payload:
b'{"query": "\\n query {\\n flow_run(where: {state: {_eq: \\"RUNNING\\"}}) {\\n id\\n start_time\\n }\\n }\\n "}'
Traceback (most recent call last):
File "/Users/ajones/GitHub/dbt-models/.github/prefect_test_2.py", line 41, in <module>
running_flows = get_running_flows(API_KEY, PREFECT_API_URL)
File "/Users/ajones/GitHub/dbt-models/.github/prefect_test_2.py", line 38, in get_running_flows
return response.json()["data"]["flow_run"]
TypeError: 'NoneType' object is not subscriptable
Marvin
09/10/2024, 10:50 PMpython
import requests
# Replace with your Prefect API key and UI endpoint
API_KEY = "your-prefect-api-key"
PREFECT_API_URL = "<https://api.prefect.io>" # Use the correct URL if it is different
def get_running_flows(api_key, api_url):
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
query = """
query {
flow_run(where: {state: {_eq: "Running"}}) {
id
start_time
}
}
"""
response = <http://requests.post|requests.post>(
f"{api_url}/graphql",
json={"query": query},
headers=headers
)
# Debugging: Print request and response
print("Request Payload:")
print(response.request.body)
try:
response.raise_for_status() # Raise an error for bad status codes
except requests.exceptions.HTTPError as e:
print(f"Error Response: {response.text}")
raise e
response_json = response.json()
if 'errors' in response_json:
print(f"GraphQL Errors: {response_json['errors']}")
raise Exception("Errors encountered during GraphQL query.")
return response_json.get("data", {}).get("flow_run", [])
if __name__ == "__main__":
running_flows = get_running_flows(API_KEY, PREFECT_API_URL)
for flow in running_flows:
print(f"Flow ID: {flow['id']}, Start Time: {flow['start_time']}")
### Key Changes:
1. State Filter Fix:
- Ensure state is `Running`(case-sensitive).
2. More Debugging:
- Print the response JSON to catch any GraphQL-specific errors.
### GitHub Actions Workflow:
Keep using the same GitHub Actions job setup as before:
yaml
name: Get Running Prefect Flows
on: [push]
jobs:
get-flows:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.x'
- name: Install requests
run: pip install requests
- name: Get Running Flows
env:
PREFECT_API_KEY: ${{ secrets.PREFECT_API_KEY }}
run: python get_running_flows.py
If this still doesn’t solve the issue, please provide details of the response output, and I’ll help you debug further. For precise GraphQL API usage, you might want to refer to the Prefect API documentation.
Keep me posted on how it goes! 🚀ajones
09/10/2024, 10:53 PM(dbt_env) (base) ajones@MacBookWPQLC4NF dbt-models % python .github/prefect_test_2.py
Request Payload:
b'{"query": "\\n query {\\n flow_run(where: {state: {_eq: \\"Running\\"}}) {\\n id\\n start_time\\n }\\n }\\n "}'
GraphQL Errors: [{'path': ['flow_run'], 'message': 'AuthenticationError: Forbidden', 'extensions': {'code': 'UNAUTHENTICATED'}}]
Traceback (most recent call last):
File "/Users/ajones/GitHub/dbt-models/.github/prefect_test_2.py", line 47, in <module>
running_flows = get_running_flows(API_KEY, PREFECT_API_URL)
File "/Users/ajones/GitHub/dbt-models/.github/prefect_test_2.py", line 42, in get_running_flows
raise Exception("Errors encountered during GraphQL query.")
Exception: Errors encountered during GraphQL query.
Marvin
09/10/2024, 10:53 PMAuthenticationError: Forbidden
suggests that there's an issue with the authentication. Let's ensure the API key is correctly set and used.
Here's an approach to verify and handle authentication correctly:
1. Verify API Key:
- Ensure the API key has the appropriate permissions.
- Make sure the API key is correctly set in the environment.
2. Set Authentication in Headers:
- Ensure the Authorization
header is correctly formatted.
### Revised Script with Authentication Check:
Here’s the updated script with further checks:
python
import os
import requests
# Load API key from environment for better security practice
API_KEY = os.getenv("PREFECT_API_KEY")
PREFECT_API_URL = "<https://api.prefect.io>" # Ensure this is the correct endpoint
def get_running_flows(api_key, api_url):
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
query = """
query {
flow_run(where: {state: {_eq: "Running"}}) {
id
start_time
}
}
"""
response = <http://requests.post|requests.post>(
f"{api_url}/graphql",
json={"query": query},
headers=headers
)
# Debugging: Print request and response
print("Request Headers:")
print(headers)
print("Request Payload:")
print(response.request.body)
try:
response.raise_for_status() # Raise an error for bad status codes
except requests.exceptions.HTTPError as e:
print(f"Error Response: {response.text}")
raise e
response_json = response.json()
if 'errors' in response_json:
print(f"GraphQL Errors: {response_json['errors']}")
raise Exception("Errors encountered during GraphQL query.")
return response_json.get("data", {}).get("flow_run", [])
if __name__ == "__main__":
if not API_KEY:
raise Exception("API key not found. Ensure PREFECT_API_KEY environment variable is set.")
running_flows = get_running_flows(API_KEY, PREFECT_API_URL)
for flow in running_flows:
print(f"Flow ID: {flow['id']}, Start Time: {flow['start_time']}")
### GitHub Actions Workflow:
Ensure your GitHub Actions workflow properly sets the environment variable for the API key:
yaml
name: Get Running Prefect Flows
on: [push]
jobs:
get-flows:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.x'
- name: Install requests
run: pip install requests
- name: Get Running Flows
env:
PREFECT_API_KEY: ${{ secrets.PREFECT_API_KEY }}
run: python .github/prefect_test_2.py
### Ensure the Prefect API Key is Correct:
- Verify the API key you are using has the necessary permissions (likely Read access to flow runs).
- Check the key is correctly set as a GitHub Secret and is being passed to the script.
### Prefect API Documentation:
Refer to the Prefect API Client documentation for further details on authentication and other queries.
Try running the script locally to ensure it's correctly pulling the environment variable and making successful authenticated requests. Let me know how it goes! 🚀ajones
09/10/2024, 11:07 PMMarvin
09/10/2024, 11:08 PMpython
import os
import requests
# Load API key from environment for better security practice
API_KEY = os.getenv("PREFECT_API_KEY")
PREFECT_CLOUD_API_URL = "<https://api.prefect.io>" # Prefect Cloud API endpoint
def get_running_flows(api_key, api_url):
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
query = """
query {
flow_runs(where: {state: {_eq: "Running"}}) {
id
start_time
}
}
"""
response = <http://requests.post|requests.post>(
f"{api_url}/graphql",
json={"query": query},
headers=headers
)
# Debugging: Print request and response
print("Request Headers:")
print(headers)
print("Request Payload:")
print(response.request.body)
try:
response.raise_for_status() # Raise an error for bad status codes
except requests.exceptions.HTTPError as e:
print(f"Error Response: {response.text}")
raise e
response_json = response.json()
if 'errors' in response_json:
print(f"GraphQL Errors: {response_json['errors']}")
raise Exception("Errors encountered during GraphQL query.")
return response_json.get("data", {}).get("flow_runs", [])
if __name__ == "__main__":
if not API_KEY:
raise Exception("API key not found. Ensure PREFECT_API_KEY environment variable is set.")
running_flows = get_running_flows(API_KEY, PREFECT_CLOUD_API_URL)
for flow in running_flows:
print(f"Flow ID: {flow['id']}, Start Time: {flow['start_time']}")
### GitHub Actions Workflow:
Ensure your GitHub Actions workflow sets the environment variable properly:
yaml
name: Get Running Prefect Flows
on: [push]
jobs:
get-flows:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.x'
- name: Install requests
run: pip install requests
- name: Get Running Flows
env:
PREFECT_API_KEY: ${{ secrets.PREFECT_API_KEY }}
run: python .github/prefect_test_2.py
### Ensure the API Key:
- Verify your Prefect Cloud API key is correctly set up and has sufficient permissions.
- Confirm the API key is correctly added to your GitHub repository secrets.
### Troubleshooting:
- Run the script locally first to ensure it pulls the API key from the environment and authenticates properly.
- Print detailed headers and response content during debugging.
For more on Prefect Cloud API usage and authentication, refer to the Prefect Cloud API documentation.
Let me know if this resolves the issue or if you encounter any further problems! 🚀