Thomas Gerardu
07/31/2024, 12:54 PMMarvin
07/31/2024, 12:54 PMpython
import datetime
from prefect import Client
# Initialize the Prefect client
client = Client()
# Define the time window
start_time = datetime.datetime(2023, 1, 1)
end_time = datetime.datetime(2023, 1, 31)
# Query flow runs within the specified time window
query = {
"start_time": {"start": start_time.isoformat(), "end": end_time.isoformat()}
}
# Retrieve successful flow runs
successful_runs = client.get("/flow_runs", params={"state": "Success", **query})
successful_count = len(successful_runs)
# Retrieve failed flow runs
failed_runs = client.get("/flow_runs", params={"state": "Failed", **query})
failed_count = len(failed_runs)
# Output the results
print(f"Successful runs: {successful_count}")
print(f"Failed runs: {failed_count}")
### Documentation Links:
- Client Usage: Using the Client
- Prefect API Reference: API Reference
If you have further questions, feel free to ask!Thomas Gerardu
07/31/2024, 12:57 PMMarvin
07/31/2024, 12:59 PMpython
import datetime
from prefect.client import get_client
from prefect.orion.schemas.filters import FlowRunFilter, FlowRunFilterStateName
# Initialize the Prefect client
async def main():
async with get_client() as client:
# Define the UUID of the flow
flow_id = "your-flow-uuid-here"
# Define the time window
start_time = datetime.datetime(2023, 1, 1)
end_time = datetime.datetime(2023, 1, 31)
# Prepare the filters for time window and flow UUID
filters = FlowRunFilter(
flow_id={"any_": [flow_id]},
start_time={"after_": start_time, "before_": end_time}
)
# Retrieve successful flow runs
successful_runs = await client.read_flow_runs(
flow_run_filter=filters,
flow_run_filter_state_name=FlowRunFilterStateName(any_=["Completed"])
)
successful_count = len(successful_runs)
# Retrieve failed flow runs
failed_runs = await client.read_flow_runs(
flow_run_filter=filters,
flow_run_filter_state_name=FlowRunFilterStateName(any_=["Failed"])
)
failed_count = len(failed_runs)
# Output the results
print(f"Successful runs: {successful_count}")
print(f"Failed runs: {failed_count}")
if __name__ == "__main__":
import asyncio
asyncio.run(main())
### Key Points:
1. Flow UUID: Replace "your-flow-uuid-here"
with the actual UUID of the flow.
2. Filters: Added filters to specify the flow ID and the time window.
3. Async Client: Used get_client()
from prefect.client
for async operations.
### Documentation Links:
- Client Usage: Using the Client
- API Reference: API Reference
For detailed implementation, you can refer to the documentation linked above.Thomas Gerardu
07/31/2024, 1:06 PMMarvin
07/31/2024, 1:06 PMpython
import datetime
from prefect.client import get_client
from prefect.orion.schemas.filters import FlowRunFilter, FlowRunFilterStateName
async def main():
async with get_client() as client:
# Define the UUID of the flow
flow_id = "your-flow-uuid-here"
# Define the time window
start_time = datetime.datetime(2023, 1, 1)
end_time = datetime.datetime(2023, 1, 31)
# Filter flow runs by date and flow UUID
filters = FlowRunFilter(
flow_id={"any_": [flow_id]},
start_time={"after_": start_time, "before_": end_time}
)
# Define states to filter and collect counts
states = ["Completed", "Failed", "Crashing", "TimedOut"]
counts = {}
durations = []
for state in states:
flow_runs = await client.read_flow_runs(
flow_run_filter=filters,
flow_run_filter_state_name=FlowRunFilterStateName(any_=[state])
)
counts[state] = len(flow_runs)
# Calculate duration for each flow run
for run in flow_runs:
start = run.start_time
end = run.end_time
if start and end:
durations.append((end - start).total_seconds())
# Calculate max and min durations
if durations:
max_duration = max(durations)
min_duration = min(durations)
else:
max_duration = min_duration = 0
# Output the results
print(f"Successful runs: {counts.get('Completed', 0)}")
print(f"Failed runs: {counts.get('Failed', 0)}")
print(f"Crashed runs: {counts.get('Crashing', 0)}")
print(f"Timed out runs: {counts.get('TimedOut', 0)}")
print(f"Max duration (seconds): {max_duration}")
print(f"Min duration (seconds): {min_duration}")
if __name__ == "__main__":
import asyncio
asyncio.run(main())
### Key Points:
1. Flow UUID: Replace "your-flow-uuid-here"
with the actual UUID of the flow.
2. States: The script checks for Completed
, Failed
, Crashing
, and TimedOut
states.
3. Durations Calculation: It calculates the duration of each flow run to find the max and min durations.
### Documentation Links:
- Client Usage: Using the Client
- API Reference: API Reference
Feel free to replace the UUID and run the script. If you need further enhancements or have other questions, let me know!Thomas Gerardu
07/31/2024, 2:03 PMMarvin
07/31/2024, 2:03 PMbash
export PREFECT_API_URL="<https://prefect2.lizard.net/api>"
### Direct Client Configuration:
Alternatively, you can configure the client directly in your script:
python
import datetime
from prefect.client import get_client
from prefect.orion.schemas.filters import FlowRunFilter, FlowRunFilterStateName
async def main():
# Set up the client to use the custom server
async with get_client(api_url="<https://prefect2.lizard.net/api>") as client:
# Define the UUID of the flow
flow_id = "your-flow-uuid-here"
# Define the time window
start_time = datetime.datetime(2023, 1, 1)
end_time = datetime.datetime(2023, 1, 31)
# Filter flow runs by date and flow UUID
filters = FlowRunFilter(
flow_id={"any_": [flow_id]},
start_time={"after_": start_time, "before_": end_time}
)
# Define states to filter and collect counts
states = ["Completed", "Failed", "Crashing", "TimedOut"]
counts = {}
durations = []
for state in states:
flow_runs = await client.read_flow_runs(
flow_run_filter=filters,
flow_run_filter_state_name=FlowRunFilterStateName(any_=[state])
)
counts[state] = len(flow_runs)
# Calculate duration for each flow run
for run in flow_runs:
start = run.start_time
end = run.end_time
if start and end:
durations.append((end - start).total_seconds())
# Calculate max and min durations
if durations:
max_duration = max(durations)
min_duration = min(durations)
else:
max_duration = min_duration = 0
# Output the results
print(f"Successful runs: {counts.get('Completed', 0)}")
print(f"Failed runs: {counts.get('Failed', 0)}")
print(f"Crashed runs: {counts.get('Crashing', 0)}")
print(f"Timed out runs: {counts.get('TimedOut', 0)}")
print(f"Max duration (seconds): {max_duration}")
print(f"Min duration (seconds): {min_duration}")
if __name__ == "__main__":
import asyncio
asyncio.run(main())
### Summary:
1. Set the PREFECT_API_URL
environment variable or configure the API URL in the script.
2. Replace "your-flow-uuid-here"
with the actual flow UUID.
3. Ensure that your server at <https://prefect2.lizard.net/>
is properly set up and accessible.
If everything is set up correctly, the script should work on your server. If you encounter any issues or need further adjustments, let me know!Thomas Gerardu
07/31/2024, 2:48 PMMarvin
07/31/2024, 2:48 PMThomas Gerardu
07/31/2024, 2:49 PMMarvin
07/31/2024, 2:50 PMpython
import datetime
from prefect.client import get_client
from prefect.orion.schemas.filters import FlowRunFilter, FlowRunFilterStateName
async def main():
# Set up the client to use the custom server
async with get_client(api_url="<https://prefect2.lizard.net/api>") as client:
# Define the UUID of the flow
flow_id = "your-flow-uuid-here"
# Define the time window
start_time = datetime.datetime(2023, 1, 1)
end_time = datetime.datetime(2023, 1, 31)
# Define filters for the flow runs
filters = FlowRunFilter(
flow_id={"any_": [flow_id]},
start_time={"after_": start_time, "before_": end_time}
)
# States to filter and count
states = ["Completed", "Failed", "Crashing", "TimedOut"]
counts = {}
durations = []
for state in states:
flow_runs = await client.read_flow_runs(
flow_run_filter=filters,
flow_run_filter_state_name=FlowRunFilterStateName(any_=[state])
)
counts[state] = len(flow_runs)
# Calculate duration for each flow run
for run in flow_runs:
start = run.start_time
end = run.end_time
if start and end:
durations.append((end - start).total_seconds())
# Calculate max and min durations
if durations:
max_duration = max(durations)
min_duration = min(durations)
else:
max_duration = min_duration = 0
# Output the results
print(f"Successful runs: {counts.get('Completed', 0)}")
print(f"Failed runs: {counts.get('Failed', 0)}")
print(f"Crashed runs: {counts.get('Crashing', 0)}")
print(f"Timed out runs: {counts.get('TimedOut', 0)}")
print(f"Max duration (seconds): {max_duration}")
print(f"Min duration (seconds): {min_duration}")
if __name__ == "__main__":
import asyncio
asyncio.run(main())
### Key Points:
1. Flow UUID: Ensure you're using the correct UUID for the flow.
2. Flow Runs: The script retrieves runs associated with the given flow UUID.
3. States: Counts are calculated for Completed
, Failed
, Crashing
, and TimedOut
states.
4. Duration Calculation: Durations are calculated and the max/min durations are determined.
### Additional Resources:
- Using the Prefect Client
- Prefect API Reference
This script will give you a detailed overview of the flow runs associated with a specific flow. If you have more questions or need further modifications, let me know!Thomas Gerardu
07/31/2024, 2:51 PMMarvin
07/31/2024, 2:51 PMpython
import datetime
from prefect.client import Client
from prefect.orion.schemas.filters import FlowRunFilter, FlowRunFilterStateName
def main():
# Set up the client to use the custom server
client = Client(api_url="<https://prefect2.lizard.net/api>")
# Define the UUID of the flow
flow_id = "your-flow-uuid-here"
# Define the time window
start_time = datetime.datetime(2023, 1, 1)
end_time = datetime.datetime(2023, 1, 31)
# Define filters for the flow runs
filters = FlowRunFilter(
flow_id={"any_": [flow_id]},
start_time={"after_": start_time, "before_": end_time}
)
# States to filter and count
states = ["Completed", "Failed", "Crashed", "TimedOut"]
counts = {}
durations = []
for state in states:
flow_runs = client.read_flow_runs(
flow_run_filter=filters,
flow_run_filter_state_name=FlowRunFilterStateName(any_=[state])
)
counts[state] = len(flow_runs)
# Calculate duration for each flow run
for run in flow_runs:
start = run.start_time
end = run.end_time
if start and end:
durations.append((end - start).total_seconds())
# Calculate max and min durations
if durations:
max_duration = max(durations)
min_duration = min(durations)
else:
max_duration = min_duration = 0
# Output the results
print(f"Successful runs: {counts.get('Completed', 0)}")
print(f"Failed runs: {counts.get('Failed', 0)}")
print(f"Crashed runs: {counts.get('Crashed', 0)}")
print(f"Timed out runs: {counts.get('TimedOut', 0)}")
print(f"Max duration (seconds): {max_duration}")
print(f"Min duration (seconds): {min_duration}")
if __name__ == "__main__":
main()
### Key Changes:
1. Synchronous Client: Used the synchronous Client
instead of get_client()
.
2. Function Implementation: Defined main()
as a synchronous function.
3. State Handling: Adjusted the loop to handle states and filter flow runs synchronously.
### Key Points:
1. Replace "your-flow-uuid-here"
with the actual UUID of the flow.
2. Ensure your server at <https://prefect2.lizard.net/>
is properly set up and accessible.
If you encounter any issues or need further help, feel free to ask!Thomas Gerardu
07/31/2024, 2:52 PMMarvin
07/31/2024, 2:52 PMPREFECT_API_KEY
environment variable or pass the key directly when initializing the client.
### Example Script with Authentication via API Key:
python
import datetime
import os
from prefect.client import Client
from prefect.orion.schemas.filters import FlowRunFilter, FlowRunFilterStateName
def main():
# Set up the client to use the custom server
api_key = os.getenv("PREFECT_API_KEY", "your-api-key-here") # Fetch from env variable or set directly
client = Client(api_url="<https://prefect2.lizard.net/api>", api_key=api_key)
# Define the UUID of the flow
flow_id = "your-flow-uuid-here"
# Define the time window
start_time = datetime.datetime(2023, 1, 1)
end_time = datetime.datetime(2023, 1, 31)
# Define filters for the flow runs
filters = FlowRunFilter(
flow_id={"any_": [flow_id]},
start_time={"after_": start_time, "before_": end_time}
)
# States to filter and count
states = ["Completed", "Failed", "Crashed", "TimedOut"]
counts = {}
durations = []
for state in states:
flow_runs = client.read_flow_runs(
flow_run_filter=filters,
flow_run_filter_state_name=FlowRunFilterStateName(any_=[state])
)
counts[state] = len(flow_runs)
# Calculate duration for each flow run
for run in flow_runs:
start = run.start_time
end = run.end_time
if start and end:
durations.append((end - start).total_seconds())
# Calculate max and min durations
if durations:
max_duration = max(durations)
min_duration = min(durations)
else:
max_duration = min_duration = 0
# Output the results
print(f"Successful runs: {counts.get('Completed', 0)}")
print(f"Failed runs: {counts.get('Failed', 0)}")
print(f"Crashed runs: {counts.get('Crashed', 0)}")
print(f"Timed out runs: {counts.get('TimedOut', 0)}")
print(f"Max duration (seconds): {max_duration}")
print(f"Min duration (seconds): {min_duration}")
if __name__ == "__main__":
main()
### Steps to Authenticate:
1. Generate an API Key: Obtain your API key from your Prefect Cloud account or your self-hosted Prefect Orion server.
2. Set Environment Variable (Optional): You can set the API key as an environment variable:
bash
export PREFECT_API_KEY="your-api-key-here"
3. Run Script: Execute the script, ensuring it uses the correct API key.
### References:
- Prefect Authentication
- Using Prefect Client
This script should work seamlessly whether you run it from your local machine or any other environment, provided you authenticate correctly. If you need further help, feel free to ask!Thomas Gerardu
07/31/2024, 2:54 PMMarvin
07/31/2024, 2:54 PMapi_url
. You can simply instantiate the Client
without additional parameters.
### Example Script for Local Execution on the Server:
python
import datetime
from prefect.client import Client
from prefect.orion.schemas.filters import FlowRunFilter, FlowRunFilterStateName
def main():
# Initialize the client for local execution
client = Client()
# Define the UUID of the flow
flow_id = "your-flow-uuid-here"
# Define the time window
start_time = datetime.datetime(2023, 1, 1)
end_time = datetime.datetime(2023, 1, 31)
# Define filters for the flow runs
filters = FlowRunFilter(
flow_id={"any_": [flow_id]},
start_time={"after_": start_time, "before_": end_time}
)
# States to filter and count
states = ["Completed", "Failed", "Crashed", "TimedOut"]
counts = {}
durations = []
for state in states:
flow_runs = client.read_flow_runs(
flow_run_filter=filters,
flow_run_filter_state_name=FlowRunFilterStateName(any_=[state])
)
counts[state] = len(flow_runs)
# Calculate duration for each flow run
for run in flow_runs:
start = run.start_time
end = run.end_time
if start and end:
durations.append((end - start).total_seconds())
# Calculate max and min durations
if durations:
max_duration = max(durations)
min_duration = min(durations)
else:
max_duration = min_duration = 0
# Output the results
print(f"Successful runs: {counts.get('Completed', 0)}")
print(f"Failed runs: {counts.get('Failed', 0)}")
print(f"Crashed runs: {counts.get('Crashed', 0)}")
print(f"Timed out runs: {counts.get('TimedOut', 0)}")
print(f"Max duration (seconds): {max_duration}")
print(f"Min duration (seconds): {min_duration}")
if __name__ == "__main__":
main()
### Key Points:
1. Local Client Initialization: Just use client = Client()
if the script is running on the same server where the Prefect API is hosted and accessible via default settings.
2. Flow UUID: Replace "your-flow-uuid-here"
with the actual flow UUID.
3. No API URL Required: Omitting the api_url
parameter will make the client connect to the default local endpoint.
### Summary:
- On the server where Prefect is hosted: Use Client()
without specifying the api_url
.
- On a different machine: Specify the api_url
and handle authentication if required.
If there are any specific conditions or configurations for your Prefect setup that might affect connectivity, you may still need to adjust the client initialization accordingly. If you need further assistance, let me know!Marvin
08/01/2024, 7:21 AMClient
with the appropriate api_url
and include Basic Auth credentials in the headers.
Below is an example of how you can achieve this:
### Example Script with Basic Auth:
python
import datetime
from prefect.client import Client
import requests
from requests.auth import HTTPBasicAuth
from prefect.orion.schemas.filters import FlowRunFilter, FlowRunFilterStateName
def main():
# Define Basic Auth credentials
username = "your-username"
password = "your-password"
# Define the API URL
api_url = "<https://prefect2.lizard.net/api>"
# Initialize the Prefect Client with custom headers for Basic Auth
client = Client(
api_url=api_url,
headers={
"Authorization": f"Basic {requests.auth._basic_auth_str(username, password)}"
}
)
# Define the UUID of the flow
flow_id = "your-flow-uuid-here"
# Define the time window
start_time = datetime.datetime(2023, 1, 1)
end_time = datetime.datetime(2023, 1, 31)
# Define filters for the flow runs
filters = FlowRunFilter(
flow_id={"any_": [flow_id]},
start_time={"after_": start_time, "before_": end_time}
)
# States to filter and count
states = ["Completed", "Failed", "Crashed", "TimedOut"]
counts = {}
durations = []
for state in states:
flow_runs = client.read_flow_runs(
flow_run_filter=filters,
flow_run_filter_state_name=FlowRunFilterStateName(any_=[state])
)
counts[state] = len(flow_runs)
# Calculate duration for each flow run
for run in flow_runs:
start = run.start_time
end = run.end_time
if start and end:
durations.append((end - start).total_seconds())
# Calculate max and min durations
if durations:
max_duration = max(durations)
min_duration = min(durations)
else:
max_duration = min_duration = 0
# Output the results
print(f"Successful runs: {counts.get('Completed', 0)}")
print(f"Failed runs: {counts.get('Failed', 0)}")
print(f"Crashed runs: {counts.get('Crashed', 0)}")
print(f"Timed out runs: {counts.get('TimedOut', 0)}")
print(f"Max duration (seconds): {max_duration}")
print(f"Min duration (seconds): {min_duration}")
if __name__ == "__main__":
main()
### Key Points:
1. HTTPBasicAuth: Utilized requests.auth._basic_auth_str
to encode Basic Auth credentials.
2. Headers: Added an Authorization
header to the Client
initialization.
3. Flow UUID and Time Window: Ensure you replace "your-flow-uuid-here"
and adjust the time window as needed.
### Steps:
1. Replace credentials: Substitute username
and password
with your actual credentials.
2. Configure Flow UUID: Replace "your-flow-uuid-here"
with the actual UUID of the flow you want to query.
3. Run Script: Execute the script from your device.
### References:
- Using Prefect Client
- Authentication
This script should work for accessing the Prefect API with Basic Auth from another network. If you encounter any issues or have further questions, let me know!Thomas Gerardu
08/01/2024, 7:24 AMMarvin
08/01/2024, 7:24 AMClient
import. This could be due to a version mismatch or an incorrect import path.
Starting from Prefect 2.0, the client code was moved under the prefect.client.orion
module. Let's update your script to use the correct import path and also ensure that you have the correct packages installed.
### Step-by-Step Solution
1. Check Installation:
Make sure you have the correct version of prefect
installed:
bash
pip install prefect
2. Correct Import:
Use the appropriate client import path which is compatible with your Prefect version. For Prefect 2.x:
### Updated Script
Here's the corrected script using the appropriate import paths:
python
import datetime
from prefect.client.orion import OrionClient
import requests
from requests.auth import HTTPBasicAuth
from prefect.orion.schemas.filters import FlowRunFilter, FlowRunFilterStateName
def main():
# Define Basic Auth credentials
username = "your-username"
password = "your-password"
# Define the API URL
api_url = "<https://prefect2.lizard.net/api>"
# Initialize the Prefect Client with custom headers for Basic Auth
client = OrionClient(
api_url=api_url,
headers={
"Authorization": f"Basic {requests.auth._basic_auth_str(username, password)}"
}
)
# Define the UUID of the flow
flow_id = "your-flow-uuid-here"
# Define the time window
start_time = datetime.datetime(2023, 1, 1)
end_time = datetime.datetime(2023, 1, 31)
# Define filters for the flow runs
filters = FlowRunFilter(
flow_id={"any_": [flow_id]},
start_time={"after_": start_time, "before_": end_time}
)
# States to filter and count
states = ["Completed", "Failed", "Crashed", "TimedOut"]
counts = {}
durations = []
for state in states:
flow_runs = client.read_flow_runs(
flow_run_filter=filters,
flow_run_filter_state_name=FlowRunFilterStateName(any_=[state])
)
counts[state] = len(flow_runs)
# Calculate duration for each flow run
for run in flow_runs:
start = run.start_time
end = run.end_time
if start and end:
durations.append((end - start).total_seconds())
# Calculate max and min durations
if durations:
max_duration = max(durations)
min_duration = min(durations)
else:
max_duration = min_duration = 0
# Output the results
print(f"Successful runs: {counts.get('Completed', 0)}")
print(f"Failed runs: {counts.get('Failed', 0)}")
print(f"Crashed runs: {counts.get('Crashed', 0)}")
print(f"Timed out runs: {counts.get('TimedOut', 0)}")
print(f"Max duration (seconds): {max_duration}")
print(f"Min duration (seconds): {min_duration}")
if __name__ == "__main__":
main()
### Check Prefect Version
You can also check the version of Prefect to make sure you're using Prefect 2.x:
bash
prefect version
If the version is indeed Prefect 2.x and the correct script is still causing an issue, please let me know!Thomas Gerardu
08/01/2024, 7:29 AMMarvin
08/01/2024, 7:29 AMOrionClient
resides under prefect.client
. Here's the corrected script:
### Corrected Script Using OrionClient
python
import datetime
from prefect.client import OrionClient
import requests
from requests.auth import HTTPBasicAuth
from prefect.orion.schemas.filters import FlowRunFilter, FlowRunFilterStateName
def main():
# Define Basic Auth credentials
username = "your-username"
password = "your-password"
# Define the API URL
api_url = "<https://prefect2.lizard.net/api>"
# Initialize the Prefect Client with custom headers for Basic Auth
headers = {
"Authorization": f"Basic {requests.auth._basic_auth_str(username, password)}"
}
client = OrionClient(api_url, headers=headers)
# Define the UUID of the flow
flow_id = "your-flow-uuid-here"
# Define the time window
start_time = datetime.datetime(2023, 1, 1)
end_time = datetime.datetime(2023, 1, 31)
# Define filters for the flow runs
filters = FlowRunFilter(
flow_id={"any_": [flow_id]},
start_time={"after_": start_time, "before_": end_time}
)
# States to filter and count
states = ["Completed", "Failed", "Crashed", "TimedOut"]
counts = {}
durations = []
for state in states:
flow_runs = client.read_flow_runs(
flow_run_filter=filters,
flow_run_filter_state_name=FlowRunFilterStateName(any_=[state])
)
counts[state] = len(flow_runs)
# Calculate duration for each flow run
for run in flow_runs:
start = run.start_time
end = run.end_time
if start and end:
durations.append((end - start).total_seconds())
# Calculate max and min durations
if durations:
max_duration = max(durations)
min_duration = min(durations)
else:
max_duration = min_duration = 0
# Output the results
print(f"Successful runs: {counts.get('Completed', 0)}")
print(f"Failed runs: {counts.get('Failed', 0)}")
print(f"Crashed runs: {counts.get('Crashed', 0)}")
print(f"Timed out runs: {counts.get('TimedOut', 0)}")
print(f"Max duration (seconds): {max_duration}")
print(f"Min duration (seconds): {min_duration}")
if __name__ == "__main__":
main()
### Key Points:
1. Headers Initialization: Create the headers
dictionary with Basic Auth.
2. OrionClient Initialization: Pass the api_url
and headers
to OrionClient
.
### Summary:
- URL and Auth: Make sure to replace "your-username"
, "your-password"
, and "your-flow-uuid-here"
with actual values.
- Client Instantiation: Correctly configure the client to point to the Prefect API with Basic Auth.
This script should now work for accessing the Prefect API from your device with Basic Auth. If you encounter any issues or need further assistance, feel free to ask!Thomas Gerardu
08/01/2024, 7:38 AMMarvin
08/01/2024, 7:38 AMprefect.client
module. Let's clarify the correct import paths and ensure we're using the appropriate methods for Prefect 2.19.4.
For Prefect 2.x, particularly 2.19.4, let's use the OrionClient
properly. It also appears I did not validate the import correctly in my last response. Let's correct that.
### Correct Usage for Prefect 2.19.4
In Prefect 2.x, you should use get_client
function from prefect.client
. There isn't a direct Client
class that should be used as seen in legacy documentation. Instead, we use get_client()
to acquire a client instance.
Here's a complete example with Basic Auth applied for accessing the Prefect API:
### Correct Script:
Firstly, we need to add a method to support the generation of the Authorization
header which isn't a standard feature of Prefect. Instead, we leverage httpx
to manage credentials.
python
import datetime
import httpx
from prefect.client.orion import get_client
from prefect.orion.schemas.filters import FlowRunFilter, FlowRunFilterStateName
def main():
# Define Basic Auth credentials
username = "your-username"
password = "your-password"
# Define the API URL
api_url = "<https://prefect2.lizard.net/api>"
# Custom authentication header for Basic Auth
headers = {
"Authorization": f"Basic {httpx.Auth(username, password).auth}"
}
async def get_data():
# Initialize the Prefect Client with custom headers for Basic Auth
async with get_client(api_url=api_url, headers=headers) as client:
# Define the UUID of the flow
flow_id = "your-flow-uuid-here"
# Define the time window
start_time = datetime.datetime(2023, 1, 1)
end_time = datetime.datetime(2023, 1, 31)
# Define filters for the flow runs
filters = FlowRunFilter(
flow_id={"any_": [flow_id]},
start_time={"after_": start_time, "before_": end_time}
)
# States to filter and count
states = ["Completed", "Failed", "Crashed", "TimedOut"]
counts = {}
durations = []
for state in states:
flow_runs = await client.read_flow_runs(
flow_run_filter=filters,
flow_run_filter_state_name=FlowRunFilterStateName(any_=[state])
)
counts[state] = len(flow_runs)
# Calculate duration for each flow run
for run in flow_runs:
start = run.start_time
end = run.end_time
if start and end:
durations.append((end - start).total_seconds())
# Calculate max and min durations
if durations:
max_duration = max(durations)
min_duration = min(durations)
else:
max_duration = min_duration = 0
# Output the results
print(f"Successful runs: {counts.get('Completed', 0)}")
print(f"Failed runs: {counts.get('Failed', 0)}")
print(f"Crashed runs: {counts.get('Crashed', 0)}")
print(f"Timed out runs: {counts.get('TimedOut', 0)}")
print(f"Max duration (seconds): {max_duration}")
print(f"Min duration (seconds): {min_duration}")
import asyncio
asyncio.run(get_data())
if __name__ == "__main__":
main()
### Key Points
1. Authentication Header:
- Use httpx.Auth
to create authentication headers because Prefect doesn’t directly support Basic Auth.
2. Client Usage:
- Use get_client
from prefect.client.orion
to acquire the client in an async context.
3. Async Context:
- The script uses asynchronous methods compatible with Prefect 2.x.Marvin
08/01/2024, 7:38 AM"your-username"
, "your-password"
, and "your-flow-uuid-here"
with the actual values. This setup should work seamlessly with your Prefect 2.19.4 and Basic Auth requirements.
If this approach still doesn't work, please provide the error details, and I'll assist further.Thomas Gerardu
08/01/2024, 8:15 AMMarvin
08/01/2024, 8:15 AMclient.orion
. Given your environment, the HTTPBasicAuth
with requests is straightforward and less prone to errors. We will handle the HTTP requests directly without relying on Prefect Client classes for now.
Here's a more direct approach using requests
to make API calls with Basic Auth:
### Example Using requests
python
import datetime
import requests
from requests.auth import HTTPBasicAuth
def main():
# Define Basic Auth credentials
username = "your-username"
password = "your-password"
# Define the API URL
api_url = "<https://prefect2.lizard.net/api/flow_runs/filter>"
# Define the UUID of the flow
flow_id = "your-flow-uuid-here"
# Define the time window
start_time = datetime.datetime(2023, 1, 1).isoformat()
end_time = datetime.datetime(2023, 1, 31).isoformat()
# Define filters for the flow runs
filters = {
"flow": {"id": {"any_": [flow_id]}},
"start_time": {"after_": start_time, "before_": end_time}
}
# States to filter and count
states = ["Completed", "Failed", "Crashing", "TimedOut"]
counts = {}
durations = []
for state in states:
# Update filters with state
filters["state"] = {"name": {"any_": [state]}}
# Make API call with Basic Auth
response = <http://requests.post|requests.post>(api_url, json=filters, auth=HTTPBasicAuth(username, password))
response.raise_for_status() # Raise an error for bad responses
flow_runs = response.json()
counts[state] = len(flow_runs)
# Calculate duration for each flow run
for run in flow_runs:
start = datetime.datetime.fromisoformat(run["start_time"])
end = datetime.datetime.fromisoformat(run["end_time"])
if start and end:
durations.append((end - start).total_seconds())
# Calculate max and min durations
if durations:
max_duration = max(durations)
min_duration = min(durations)
else:
max_duration = min_duration = 0
# Output the results
print(f"Successful runs: {counts.get('Completed', 0)}")
print(f"Failed runs: {counts.get('Failed', 0)}")
print(f"Crashed runs: {counts.get('Crashing', 0)}")
print(f"Timed out runs: {counts.get('TimedOut', 0)}")
print(f"Max duration (seconds): {max_duration}")
print(f"Min duration (seconds): {min_duration}")
if __name__ == "__main__":
main()
### Key Points:
1. Requests Library: Using requests
for HTTP communications with Basic Auth.
2. API Endpoint: Communicating directly with the Prefect API endpoint for flow runs.
3. Filters: The JSON body contains filters for flow_id, start_time, end_time, and state.
4. Durations Calculation: Calculating max and min durations from the flow runs.
### Setup:
1. Replace Credentials: Substitute your-username
and your-password
with actual credentials.
2. Flow UUID: Replace your-flow-uuid-here
with the actual flow UUID.
This approach bypasses direct use of Prefect's OrionClient
and handles everything via standard HTTP requests, making it simpler to debug and handle authentication.
If this still doesn't work or if there are specific errors, please provide those details, and I'll adjust accordingly.Thomas Gerardu
08/01/2024, 8:17 AMMarvin
08/01/2024, 8:18 AMstart_time
or end_time
might not be in the correct format, or they might be None
. We should ensure proper handling of these fields and validate before conversion.
Let's add some checks to handle these cases gracefully.
### Updated Script with Error Handling:
python
import datetime
import requests
from requests.auth import HTTPBasicAuth
def main():
# Define Basic Auth credentials
username = "your-username"
password = "your-password"
# Define the API URL
api_url = "<https://prefect2.lizard.net/api/flow_runs/filter>"
# Define the UUID of the flow
flow_id = "your-flow-uuid-here"
# Define the time window
start_time = datetime.datetime(2023, 1, 1).isoformat()
end_time = datetime.datetime(2023, 1, 31).isoformat()
# Define filters for the flow runs
filters = {
"flow": {"id": {"any_": [flow_id]}},
"start_time": {"after_": start_time, "before_": end_time}
}
# States to filter and count
states = ["Completed", "Failed", "Crashing", "TimedOut"]
counts = {}
durations = []
for state in states:
# Update filters with state
filters["state"] = {"name": {"any_": [state]}}
# Make API call with Basic Auth
response = <http://requests.post|requests.post>(api_url, json=filters, auth=HTTPBasicAuth(username, password))
response.raise_for_status() # Raise an error for bad responses
flow_runs = response.json()
counts[state] = len(flow_runs)
# Calculate duration for each flow run
for run in flow_runs:
start = run.get("start_time")
end = run.get("end_time")
if start and end:
# Convert to datetime object
start_datetime = datetime.datetime.fromisoformat(start)
end_datetime = datetime.datetime.fromisoformat(end)
durations.append((end_datetime - start_datetime).total_seconds())
# Calculate max and min durations
if durations:
max_duration = max(durations)
min_duration = min(durations)
else:
max_duration = min_duration = 0
# Output the results
print(f"Successful runs: {counts.get('Completed', 0)}")
print(f"Failed runs: {counts.get('Failed', 0)}")
print(f"Crashed runs: {counts.get('Crashing', 0)}")
print(f"Timed out runs: {counts.get('TimedOut', 0)}")
print(f"Max duration (seconds): {max_duration}")
print(f"Min duration (seconds): {min_duration}")
if __name__ == "__main__":
main()
### Key Changes:
1. Error Handling: Used run.get("start_time")
and run.get("end_time")
to fetch values and check if they are valid before converting.
2. Validation: Added checks to ensure start_time
and end_time
are not None
.
This should fix the TypeError
. If the issue persists or you encounter other errors, please provide the specifics, and I'll help resolve them.Thomas Gerardu
08/01/2024, 8:35 AMMarvin
08/01/2024, 8:35 AMZ
for UTC).
### Updated Script for Consistency:
We'll update the script to make sure it aligns perfectly with your UI filters.
python
import datetime
import requests
from requests.auth import HTTPBasicAuth
def main():
# Define Basic Auth credentials
username = "your-username"
password = "your-password"
# Define the API URL
api_url = "<https://prefect2.lizard.net/api/flow_runs/filter>"
# Define the UUID of the flow
flow_id = "6c339f45-7ca1-4fb9-a50f-a4ec29d2d0d3"
# Define the time window
start_time = "2024-07-24T22:00:00.000Z"
end_time = "2024-07-31T21:59:59.999Z"
# Define filters for the flow runs
filters = {
"flow": {"id": {"any_": [flow_id]}},
"start_time": {"after_": start_time, "before_": end_time}
}
# States to filter and count
states = ["Completed", "Failed", "Crashing", "TimedOut"]
counts = {}
durations = []
for state in states:
# Update filters with state
filters["state"] = {"name": {"any_": [state]}}
# Make API call with Basic Auth
response = <http://requests.post|requests.post>(api_url, json=filters, auth=HTTPBasicAuth(username, password))
response.raise_for_status() # Raise an error for bad responses
flow_runs = response.json()
counts[state] = len(flow_runs)
# Calculate duration for each flow run
for run in flow_runs:
start = run.get("start_time")
end = run.get("end_time")
if start and end:
# Convert to datetime object
start_datetime = datetime.datetime.fromisoformat(start.replace("Z", "+00:00"))
end_datetime = datetime.datetime.fromisoformat(end.replace("Z", "+00:00"))
durations.append((end_datetime - start_datetime).total_seconds())
# Calculate max and min durations
if durations:
max_duration = max(durations)
min_duration = min(durations)
else:
max_duration = min_duration = 0
# Output the results
print(f"Successful runs: {counts.get('Completed', 0)}")
print(f"Failed runs: {counts.get('Failed', 0)}")
print(f"Crashed runs: {counts.get('Crashing', 0)}")
print(f"Timed out runs: {counts.get('TimedOut', 0)}")
print(f"Max duration (seconds): {max_duration}")
print(f"Min duration (seconds): {min_duration}")
if __name__ == "__main__":
main()
### Key Changes:
- Date-Time Consistency: Ensured date-time formats are in ISO 8601 with Z
for UTC.
- Flow ID and Filters: Matched flow ID and time filters exactly as specified in your URL.
### Verify API Filter Matching:
Ensure the API filters used in the script match those used by the Prefect UI. The UI might use additional filters not immediately visible. Here’s how to verify:
1. Inspect Network Calls:
- Open your browser’s Developer Tools (often F12 or right-click → Inspect).
- Go to the Network tab.
- Refresh the Prefect UI page showing the flow runs.
- Look for requests going to a similar endpoint, such as /api/flow_runs/filter
.
2. Check Request Payload:
- Click on one of the requests.
- Check the Request Payload section to see the exact filters being sent.
Based on this, adjust your script to match precisely the filters used by the UI.
If there are specific discrepancies or additional details in the filters that you notice, please let me know, and we can further refine the script to match the UI data exactly.Thomas Gerardu
08/01/2024, 8:37 AMMarvin
08/01/2024, 8:37 AMpython
import datetime
import requests
from requests.auth import HTTPBasicAuth
def main():
# Define Basic Auth credentials
username = "your-username"
password = "your-password"
# Define the API URL
api_url = "<https://prefect2.lizard.net/api/flow_runs/filter>"
# Define the UUID of the flow
flow_id = "6c339f45-7ca1-4fb9-a50f-a4ec29d2d0d3"
# Define the time window
start_time = "2024-07-24T22:00:00.000Z"
end_time = "2024-07-31T21:59:59.999Z"
# Define filters for the flow runs
filters = {
"flow": {"id": {"any_": [flow_id]}},
"start_time": {"after_": start_time, "before_": end_time}
}
# States to filter and count
states = ["Completed", "Failed", "Crashing", "TimedOut"]
counts = {}
all_flow_runs = {}
for state in states:
# Update filters with state
filters["state"] = {"name": {"any_": [state]}}
# Log filters
print(f"Using filters: {filters}")
# Make API call with Basic Auth
response = <http://requests.post|requests.post>(api_url, json=filters, auth=HTTPBasicAuth(username, password))
# Log response status and content
print(f"Response Status: {response.status_code}")
if response.status_code != 200:
print(response.text)
response.raise_for_status() # Raise an error for bad responses
flow_runs = response.json()
all_flow_runs[state] = flow_runs
counts[state] = len(flow_runs)
print(f"Flow runs for state {state}: {counts[state]}")
# Output the results
print(f"Successful runs: {counts.get('Completed', 0)}")
print(f"Failed runs: {counts.get('Failed', 0)}")
print(f"Crashed runs: {counts.get('Crashing', 0)}")
print(f"Timed out runs: {counts.get('TimedOut', 0)}")
# Verifying some response content
for state, runs in all_flow_runs.items():
print(f"Sample {state} run: {runs[0] if runs else 'None'}")
if __name__ == "__main__":
main()
### Key Points:
1. Logging Filters: Print the filters being used for each state.
2. Logging Responses: Print the status code and response content if the status is not 200.
3. Logging Flow Runs: Log the number of flow runs for each state after filtering.
Run this script and carefully check the printed logs to ensure:
- The filters are being set correctly and matching your expectations.
- The API responses are what you'd expect (200 status code and correct JSON format).
- The flow run counts match what's shown in the UI.
### Next Steps:
1. If the filters are correct:
- The script should match the UI, indicating the original issue might be elsewhere.
2. If discrepancies remain:
- Provide the sample run information printed in the logs.
- Check for differences in the response content vs. UI expectations.
If the issues persist or the results still don't match the UI expectations, please share the detailed outputs and any further discrepancies observed. This will help refine the troubleshooting further.Thomas Gerardu
08/01/2024, 8:39 AMMarvin
08/01/2024, 8:39 AMrunfile('C:/Users/thomas.gerardu/Documents/DUMP/prefect_test.py', wdir='C:/Users/thomas.gerardu/Documents/DUMP')Using filters: {'flow': {'id': {'any_': ['6c339f45-7ca1-4fb9-a50f-a4ec29d2d0d3']}}, 'start_time': {'after_': '2024-07-24T220000.000Z', 'before_': '2024-07-31T215959.999Z'}, 'state': {'name': {'any_': ['Completed']}}} Response Status: 200 Flow runs for state Completed: 200 Using filters: {'flow': {'id': {'any_': ['6c339f45-7ca1-4fb9-a50f-a4ec29d2d0d3']}}, 'start_time': {'after_': '2024-07-24T220000.000Z', 'before_': '2024-07-31T215959.999Z'}, 'state': {'name': {'any_': ['Failed']}}} Response Status: 200 Flow runs for state Failed: 200 Using filters: {'flow': {'id': {'any_': ['6c339f45-7ca1-4fb9-a50
Marvin
08/01/2024, 8:40 AMUsing filters: {'flow': {'id': {'any_': ['6c339f45-7ca1-4fb9-a50f-a4ec29d2d0d3']}}, 'start_time': {'after_': '2024-07-24T220000.000Z', 'before_': '2024-07-31T215959.999Z'}, 'state': {'name': {'any_': ['Completed']}}}Response Status: 200 Flow runs for state Completed: 200 Using filters: {'flow': {'id': {'any_': ['6c339f45-7ca1-4fb9-a50f-a4ec29d2d0d3']}}, 'start_time': {'after_': '2024-07-24T220000.000Z', 'before_': '2024-07-31T215959.999Z'}, 'state': {'name': {'any_': ['Failed']}}} Response Status: 200 Flow runs for state Failed: 200 Using filters: {'flow': {'id': {'any_': ['6c339f45-7ca1-4fb9-a50f-a4ec29d2d0d3']}}, 'start_time': {'after_': '2024-07-24T220000.
Thomas Gerardu
08/01/2024, 8:48 AMMarvin
08/01/2024, 8:48 AM/api/flow_runs/filter
endpoint is likely expecting a POST request with a JSON body, constructing a valid request URL for a browser GET request isn't straightforward. To test in a browser, you'd typically need an endpoint that supports GET requests with query parameters.
To facilitate testing, you can use a REST client like Postman or Insomnia that allows you to more easily construct and send POST requests with JSON bodies and authentication.
If you still prefer to see the response in a browser, you may use tools like cURL to achieve this from the command line.
### Using cURL for Testing
cURL Command:
sh
curl -X POST "<https://prefect2.lizard.net/api/flow_runs/filter>" \
-u "your-username:your-password" \
-H "Content-Type: application/json" \
-d '{
"flow": {"id": {"any_": ["6c339f45-7ca1-4fb9-a50f-a4ec29d2d0d3"]}},
"start_time": {"after_": "2024-07-24T22:00:00.000Z", "before_": "2024-07-31T21:59:59.999Z"},
"state": {"name": {"any_": ["Completed"]}}
}'
### Using Postman or Insomnia
1. Create a New Request:
- Set the method to POST
.
- Set the URL to <https://prefect2.lizard.net/api/flow_runs/filter>
.
2. Set Up Authentication:
- Choose Basic Auth and enter your username
and password
.
3. Set Headers:
- Add Content-Type: application/json
.
4. Set Body:
- Choose the raw option and set the body to JSON.
- Paste the following JSON:
json
{
"flow": {"id": {"any_": ["6c339f45-7ca1-4fb9-a50f-a4ec29d2d0d3"]}},
"start_time": {"after_": "2024-07-24T22:00:00.000Z", "before_": "2024-07-31T21:59:59.999Z"},
"state": {"name": {"any_": ["Completed"]}}
}
### Explanation of Payload:
- `"flow": {"id": {"any_": ["6c339f45-7ca1-4fb9-a50f-a4ec29d2d0d3"]}}`: Specifies the flow ID.
- `"start_time": {"after_": "2024-07-24T220000.000Z", "before_": "2024-07-31T215959.999Z"}`: Time window filter.
- `"state": {"name": {"any_": ["Completed"]}}`: State filter to retrieve completed runs.
This setup should help you test and view the response in tools that support HTTP requests rather than a browser since browsers alone can't easily handle such POST requests with JSON bodies.
### If Using Tools:
1. Inspect the Response: See the results directly in the tool.
2. Adjust Filters: Modify the JSON body to test different states or time windows.
### Validate Command-Line Requests
Sometimes, accessing server responses via command-line tools helps understand what data the API provides. Adjust filters and run requests to validate function.
If there's any specific aspect or a further request, feel free to ask!Thomas Gerardu
08/01/2024, 8:53 AMMarvin
08/01/2024, 8:53 AMpostman return this:{"exception_message":"Invalid request received.","exception_detail":[{"loc":["body",0],"msg":"Expecting value: line 1 column 1 (char 0)","type":"value_error.jsondecode","ctx":{"msg":"Expecting value","doc":"json\r\n {\r\n \"flow\": {\"id\": {\"any_\": [\"6c339f45-7ca1-4fb9-a50f-a4ec29d2d0d3\"]}},\r\n \"start_time\": {\"after_\": \"2024-07-24T220000.000Z\", \"before_\": \"2024-07-31T215959.999Z\"},\r\n \"state\": {\"name\": {\"any_\": [\"Completed\"]}}\r\n }\r\n ","pos":0,"lineno":1,"colno":1}}],"request_body":"json\r\n {\r\n \"flow\": {\"id\": {\"any_\": [\"6c339f45-7ca1-4fb9-a50f-a4ec29d2d0d3\"]}},\r\n \"start_time\": {\"after_\": \"2024-07-24T220000.000Z\", \"before_\": \"2024-07-31T21
Marvin
08/01/2024, 8:55 AMjson
{
"flow": {"id": {"any_": ["6c339f45-7ca1-4fb9-a50f-a4ec29d2d0d3"]}},
"start_time": {"after_": "2024-07-24T22:00:00.000Z", "before_": "2024-07-31T21:59:59.999Z"},
"state": {"name": {"any_": ["Completed"]}}
}
### Steps in Postman:
1. Request Type: POST
2. URL: <https://prefect2.lizard.net/api/flow_runs/filter>
3. Authorization: Basic Auth with your username
and password
4. Headers:
- Key: Content-Type
- Value: application/json
5. Body:
- Select raw and ensure the format is set to JSON.
- Paste the clean JSON body above.
Ensuring the formatting is correct usually resolves parsing issues. Here’s a step-by-step reminder to re-check settings in Postman:
### Postman Step-by-Step:
1. Set Request Type to POST:
!Postman POST Request▾
Set URL▾
Basic Auth▾
Content-Type: application/json
header.
!Set Headers▾
Postman Body▾
cURL
in your command line to verify if the issue persists there:
cURL Command:
sh
curl -X POST "<https://prefect2.lizard.net/api/flow_runs/filter>" \
-u "your-username:your-password" \
-H "Content-Type: application/json" \
-d '{"flow": {"id": {"any_": ["6c339f45-7ca1-4fb9-a50f-a4ec29d2d0d3"]}}, "start_time": {"after_": "2024-07-24T22:00:00.000Z", "before_": "2024-07-31T21:59:59.999Z"}, "state": {"name": {"any_": ["Completed"]}}}'
### Debugging Tips:
1. Ensure no invisible characters in JSON body (copy-paste carefully).
2. Check request headers for correct content type and auth.
3. Verify authorization: Double-check correct username/password without extra spaces.
Re-try with the clean JSON body in Postman. If there are persistent issues, please share specific error messages, screenshots, or logs to provide further insights.Thomas Gerardu
08/01/2024, 1:25 PMMarvin
08/01/2024, 1:26 PM/flows/filter
rather than /flow_runs/filter
.
Let's adjust the approach to use the correct endpoint and structure the request properly.
### Using /flows/filter
Endpoint
cURL Example:
sh
curl -X POST "<https://prefect2.lizard.net/api/flows/filter>" \
-u "your-username:your-password" \
-H "Content-Type: application/json" \
-d '{
"id": {"any_": ["6c339f45-7ca1-4fb9-a50f-a4ec29d2d0d3"]},
"start_time": {"after_": "2024-07-24T22:00:00.000Z", "before_": "2024-07-31T21:59:59.999Z"},
"state": {"name": {"any_": ["Completed"]}}
}'
### Correcting the Script for Postman
1. Set Request Type to POST
2. URL: <https://prefect2.lizard.net/api/flows/filter>
3. Authorization: Basic Auth with your username
and password
4. Headers:
- Key: Content-Type
- Value: application/json
5. Body:
- Select raw and ensure the format is set to JSON.
- Paste the clean JSON body.
### JSON Body:
Ensure the JSON body is correctly formatted for the /flows/filter
endpoint:
json
{
"id": {"any_": ["6c339f45-7ca1-4fb9-a50f-a4ec29d2d0d3"]},
"start_time": {"after_": "2024-07-24T22:00:00.000Z", "before_": "2024-07-31T21:59:59.999Z"},
"state": {"name": {"any_": ["Completed"]}}
}
### Postman Steps
1. Request Type: POST
2. URL: <https://prefect2.lizard.net/api/flows/filter>
3. Authorization: Basic Auth with your username
and password
4. Headers:
- Key: Content-Type
- Value: application/json
5. Body:
- Select raw
- Ensure JSON is chosen in the dropdown.
- Paste the valid JSON body as shown above.
### Script Example Using Requests Library
Here is an example using the requests
library in Python to send a POST request to the correct endpoint:
python
import datetime
import requests
from requests.auth import HTTPBasicAuth
def main():
# Define Basic Auth credentials
username = "your-username"
password = "your-password"
# Define the API URL
api_url = "<https://prefect2.lizard.net/api/flows/filter>"
# Define the UUID of the flow
flow_id = "6c339f45-7ca1-4fb9-a50f-a4ec29d2d0d3"
# Define the time window
start_time = "2024-07-24T22:00:00.000Z"
end_time = "2024-07-31T21:59:59.999Z"
# Define filters for the flow runs
filters = {
"id": {"any_": [flow_id]},
"start_time": {"after_": start_time, "before_": end_time},
"state": {"name": {"any_": ["Completed"]}}
}
# Log filters
print(f"Using filters: {filters}")
# Make API call with Basic Auth
response = <http://requests.post|requests.post>(api_url, json=filters, auth=HTTPBasicAuth(username, password))
# Log response status and content
print(f"Response Status: {response.status_code}")
print(f"Response Content: {response.json()}")
response.raise_for_status() # Raise an error for bad responses
if __name__ == "__main__":
main()
### Summary
1. Ensure Correct Endpoint: Use /api/flows/filter
instead of /flow_runs/filter
.
2. Properly Format JSON: Double-check JSON formatting to avoid extra characters or invisible delimiters.
3. Test with cURL: If testing in the browser isn't feasible, use cURL
or tools like Postman.
4. Validate Request: Monitor request response to verify correctness.
This should align the request with accurate Prefect API usage for retrieving flow runs or filtering specific flow details. If the issues persist or other details arise from responses, kindly share them for further adjustments.Thomas Gerardu
08/01/2024, 1:28 PMMarvin
08/01/2024, 1:29 PMflow_runs/filter
endpoint and provide the flow ID as a filter.
### Correct Approach
1. Query the flow_runs/filter
Endpoint.
2. Provide the Flow ID to filter the flow runs that belong to that flow.
3. Filter by State to get the count of successfully completed flow runs.
Let’s define the correct process to achieve this using the flow_runs/filter
endpoint.
### Steps:
1. Set up filters to include flow ID and the state (Completed).
2. Make a POST request to the flow_runs/filter
endpoint.
3. Parse the response to get the count of completed flow runs.
### Example Using requests
Library:
python
import datetime
import requests
from requests.auth import HTTPBasicAuth
def main():
# Define Basic Auth credentials
username = "your-username"
password = "your-password"
# Define the API URL
api_url = "<https://prefect2.lizard.net/api/flow_runs/filter>"
# Define the UUID of the flow
flow_id = "6c339f45-7ca1-4fb9-a50f-a4ec29d2d0d3"
# Define the time window
start_time = "2024-07-24T22:00:00.000Z"
end_time = "2024-07-31T21:59:59.999Z"
# Define filters for the flow runs
filters = {
"flow_id": {"any_": [flow_id]},
"start_time": {"after_": start_time, "before_": end_time},
"state": {"name": {"any_": ["Completed"]}}
}
# Log filters
print(f"Using filters: {filters}")
# Make API call with Basic Auth
response = <http://requests.post|requests.post>(api_url, json=filters, auth=HTTPBasicAuth(username, password))
# Log response status and content
print(f"Response Status: {response.status_code}")
response.raise_for_status() # Raise an error for bad responses
response_json = response.json()
print(f"Response Content: {response_json}")
# Extract and count flow runs
completed_runs = response_json
num_completed_runs = len(completed_runs)
# Output the results
print(f"Number of successfully completed runs: {num_completed_runs}")
if __name__ == "__main__":
main()
### Key Points:
1. Endpoint: Use /api/flow_runs/filter
for querying flow runs.
2. Filters:
- `flow_id`: Filters by the UUID of the flow.
- `start_time`: Specifies the time window for the flow runs.
- `state`: Filters by the state name, e.g., Completed
.
3. Authentication: Uses Basic Auth with your username and password.
### Testing with cURL:
For quick verification, you can use cURL
to test the endpoint:
sh
curl -X POST "<https://prefect2.lizard.net/api/flow_runs/filter>" \
-u "your-username:your-password" \
-H "Content-Type: application/json" \
-d '{
"flow_id": {"any_": ["6c339f45-7ca1-4fb9-a50f-a4ec29d2d0d3"]},
"start_time": {"after_": "2024-07-24T22:00:00.000Z", "before_": "2024-07-31T21:59:59.999Z"},
"state": {"name": {"any_": ["Completed"]}}
}'
### Summary:
- Correct Endpoint: /api/flow_runs/filter
- Proper Filtering: Filter by flow_id
, start_time
, and state
to get the completed runs.
- Authentication and Headers: Ensure correct headers and basic auth.
- Parsing Response: Extract the count directly from the JSON response.
By following this corrected approach, you should be able to consistently and accurately retrieve the number of successfully completed flow runs for the given flow.