Dijar Vrella
06/26/2024, 2:08 PMMarvin
06/26/2024, 2:08 PMpython
from prefect import Client
from prefect.utilities.logging import get_run_logger
import datetime
# Initialize the Prefect Client
client = Client()
# Fetch the last deployment
deployments = client.deployments()
last_deployment = deployments[-1] # Assuming the last one in the list is the one you want
print(f"Last Deployment ID: {last_deployment.id}")
# Run the last deployment
flow_run = client.create_flow_run_from_deployment(deployment_id=last_deployment.id)
print(f"Flow Run ID: {flow_run.id}")
# Wait for flow run to complete (you can implement a polling logic as per your requirements)
print("Waiting for flow run to complete...")
# Optionally, some waiting logic here
# Retrieve logs from the flow run
flow_run_logs = client.get_logs(flow_run_id=flow_run.id)
for log in flow_run_logs:
print(f"{log.timestamp} - {log.level}: {log.message}")
print("Logs retrieved successfully.")
Make sure to update the script with proper waiting logic if you need to wait for the flow run to complete and for authenticating with your specific Prefect environment.
For further details, you can refer to:
- Prefect Deployments
- Prefect LoggingDijar Vrella
06/26/2024, 2:09 PM<https://orion-docs.prefect.io/migration_guide/>
. Otherwise ensure that your code is pinned to the expected version.
warnings.warn(
Traceback (most recent call last):
File "/home/seedoo/dijar-kickoff/prefect/trigger-deployment.py", line 1, in <module>
from prefect import Client
ImportError: cannot import name 'Client' from 'prefect' (/home/seedoo/.local/lib/python3.10/site-packages/prefect/__init__.py)Marvin
06/26/2024, 2:09 PMClient
class doesn't exist in Prefect 2.x. Instead, we utilize the Prefect Orion API.
Here’s how you can achieve fetching the last deployment, running it, and retrieving logs in Prefect 2.x by utilizing Prefect's Orion client:
python
from prefect.client.orion import get_client
from prefect.orion.schemas.filters import FlowFilter, DeploymentFilter
from prefect.orion.schemas.core import FlowRun
import asyncio
async def trigger_deployment():
async with get_client() as client:
# Fetch the last deployment
deployments = await client.read_deployments()
last_deployment = deployments[-1] # Assuming this is the correct logic to fetch the last one
print(f"Last Deployment ID: {last_deployment.id}")
# Run the last deployment
flow_run = await client.create_flow_run_from_deployment(
deployment_id=last_deployment.id
)
print(f"Flow Run ID: {flow_run.id}")
# Wait for flow run to complete (simple polling logic)
while True:
flow_run_state = (await client.read_flow_run(flow_run.id)).state.name
if flow_run_state in {"Completed", "Failed", "Cancelled"}:
break
print(f"Current state: {flow_run_state}. Waiting to complete...")
await asyncio.sleep(10)
# Retrieve logs from the flow run
logs = await client.read_logs(flow_run_id=flow_run.id)
for log in logs:
print(f"{log.timestamp} - {log.level}: {log.message}")
# Run the asyncio event loop
asyncio.run(trigger_deployment())
Be sure to replace the logic for determining the "last deployment" with your actual requirement since deployments[-1]
is a simple assumption. This script retrieves the last deployment, triggers a flow run from that deployment, and then polls to wait for its completion and gets logs. For more advanced usage or specific configurations, refer to the following:
- Prefect Deployments
- Prefect Logging
- Migration GuideDijar Vrella
06/26/2024, 2:12 PMMarvin
06/26/2024, 2:12 PMfrom prefect.client.orchestration import get_client
. Let's update the script to correctly import the client and proceed with fetching the last deployment, executing it, and retrieving logs.
Here's your updated code:
python
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import FlowFilter, DeploymentFilter
import asyncio
async def trigger_deployment():
async with get_client() as client:
# Fetch the last deployment
deployments = await client.read_deployments()
last_deployment = deployments[-1] # Assuming this is the correct logic to fetch the last one
print(f"Last Deployment ID: {last_deployment.id}")
# Run the last deployment
flow_run = await client.create_flow_run_from_deployment(
deployment_id=last_deployment.id
)
print(f"Flow Run ID: {flow_run.id}")
# Wait for flow run to complete (simple polling logic)
while True:
flow_run_state = (await client.read_flow_run(flow_run.id)).state.name
if flow_run_state in {"Completed", "Failed", "Cancelled"}:
break
print(f"Current state: {flow_run_state}. Waiting to complete...")
await asyncio.sleep(10)
# Retrieve logs from the flow run
logs = await client.read_logs(flow_run_id=flow_run.id)
for log in logs:
print(f"{log.timestamp} - {log.level}: {log.message}")
# Run the asyncio event loop
asyncio.run(trigger_deployment())
Make sure you have the appropriate Prefect version installed and that your environment is correctly set up. This script should now work without the import errors you were encountering.
For reference, you can look at:
- Prefect 2.x Official DocumentationDijar Vrella
06/26/2024, 2:19 PMMarvin
06/26/2024, 2:19 PMread_logs
with a flow_run_id
keyword argument. Instead, you may need to query logs via the Prefect REST API or use filters with the orion
client to get the related flow run logs.
Let's use a different approach to retrieve logs by filtering based on the flow run ID:
python
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import FlowRunFilter, LogFilter, LogRead, DeploymentsFilter
import asyncio
async def trigger_deployment():
async with get_client() as client:
# Fetch the last deployment
deployments = await client.read_deployments()
last_deployment = deployments[-1] # Assuming this is the correct logic to fetch the last one
print(f"Last Deployment ID: {last_deployment.id}")
# Run the last deployment
flow_run = await client.create_flow_run_from_deployment(
deployment_id=last_deployment.id
)
print(f"Flow Run ID: {flow_run.id}")
# Wait for flow run to complete (simple polling logic)
while True:
flow_run_state = (await client.read_flow_run(flow_run.id)).state.name
if flow_run_state in {"Completed", "Failed", "Cancelled"}:
break
print(f"Current state: {flow_run_state}. Waiting to complete...")
await asyncio.sleep(10)
# Retrieve logs from the flow run
logs_filter = LogFilter(
flow_run_id={"any_": [flow_run.id]}
)
logs = await client.read_logs(logs_filter)
for log in logs:
print(f"{log.timestamp} - {log.level}: {log.message}")
# Run the asyncio event loop
asyncio.run(trigger_deployment())
This updated code uses the LogFilter
to filter logs based on the flow run ID and retrieve the logs accordingly. Make sure your Prefect environment is correctly set up for this approach.
Additionally, if you need more details or want to verify, consider referring to the Prefect 2.0 Logging Documentation.Dijar Vrella
06/26/2024, 2:20 PMMarvin
06/26/2024, 2:20 PMpython
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import FlowRunFilter, LogFilter
import asyncio
async def trigger_deployment():
async with get_client() as client:
# Fetch the last deployment
deployments = await client.read_deployments()
last_deployment = deployments[-1] # Assuming this is the correct logic to fetch the last one
print(f"Last Deployment ID: {last_deployment.id}")
# Run the last deployment
flow_run = await client.create_flow_run_from_deployment(
deployment_id=last_deployment.id
)
print(f"Flow Run ID: {flow_run.id}")
# Wait for flow run to complete (simple polling logic)
while True:
flow_run_state = (await client.read_flow_run(flow_run.id)).state.name
if flow_run_state in {"Completed", "Failed", "Cancelled"}:
break
print(f"Current state: {flow_run_state}. Waiting to complete...")
await asyncio.sleep(10)
# Retrieve logs from the flow run
logs_filter = {
"flow_run_id": {"any_": [flow_run.id]}
}
logs = await client.read_logs(logs_filter=logs_filter)
for log in logs:
print(f"{log.timestamp} - {log.level_name}: {log.message}")
# Run the asyncio event loop
asyncio.run(trigger_deployment())
Key changes:
1. Removed the LogRead
import, which was incorrect.
2. Adjusted the call to read_logs
to use a logs_filter
dictionary instead of a FlowRunFilter
.
Please ensure you have the appropriate Prefect version installed and that your environment is configured correctly.Dijar Vrella
06/26/2024, 2:21 PMMarvin
06/26/2024, 2:21 PMlogs_filter
as a parameter for read_logs
. Let's adjust the approach to properly retrieve logs without causing argument errors.
We'll check the available methods in the Prefect 2.x API to correctly retrieve and display logs. To be safe, I'll avoid parameterizations that might not be supported. Instead, I'll default to filtering results manually if necessary.
Here's an updated version of the script that correctly retrieves logs after the flow run:
python
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import FlowRunFilter
import asyncio
async def trigger_deployment():
async with get_client() as client:
# Fetch the last deployment
deployments = await client.read_deployments()
last_deployment = deployments[-1] # Assuming this is the correct logic to fetch the last one
print(f"Last Deployment ID: {last_deployment.id}")
# Run the last deployment
flow_run = await client.create_flow_run_from_deployment(
deployment_id=last_deployment.id
)
print(f"Flow Run ID: {flow_run.id}")
# Wait for flow run to complete (simple polling logic)
while True:
flow_run_state = (await client.read_flow_run(flow_run.id)).state.name
if flow_run_state in {"Completed", "Failed", "Cancelled"}:
break
print(f"Current state: {flow_run_state}. Waiting to complete...")
await asyncio.sleep(10)
# Retrieve logs from the flow run
logs = await client.read_logs(flow_run.id)
for log in logs:
print(f"{log.timestamp} - {log.level_name}: {log.message}")
# Run the asyncio event loop
asyncio.run(trigger_deployment())
Changes:
1. Removed the logs_filter
argument.
2. Corrected the call to read_logs
by directly passing flow_run.id
.
This should fix the TypeError
while correctly querying logs for the specified flow run. Ensure your Prefect setup and environment meet these requirements.Dijar Vrella
06/26/2024, 2:23 PMMarvin
06/26/2024, 2:23 PMpython3 trigger-deployment.pyLast Deployment ID: 9513d0a8-950b-4b68-9581-32a59bda591c Flow Run ID: f3456c8b-645d-4af6-a1b2-56a60001183e Current state: Scheduled. Waiting to complete... Current state: Pending. Waiting to complete... Current state: Running. Waiting to complete... Traceback (most recent call last): File "/home/seedoo/dijar-kickoff/prefect/trigger-deployment.py", line 32, in <module> asyncio.run(trigger_deployment()) File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/usr/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete return future.result() File "/home/seedoo/dijar-kickoff/prefect/trigger-deployment.py", line 27, in trigger_deployment logs = await client.read_logs(flow_run.id) File "/home/seedoo/.local/lib/python3.10/site-packages/prefect/client/orchestration.py", line 2522, in read_logs "logs": log_filter.dict(json_compatible=True) if log_filter else None, AttributeError:
Marvin
06/26/2024, 2:23 PMclient.read_logs
was not correctly called in the previously provided script. read_logs
requires the use of a filter object. To construct and pass the correct filter object, we need to use classes such as LogFilter
.
Below is the corrected script:
python
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import FlowRunFilter, LogFilter
import asyncio
async def trigger_deployment():
async with get_client() as client:
# Fetch the last deployment
deployments = await client.read_deployments()
last_deployment = deployments[-1] # Assuming this is the correct logic to fetch the last one
print(f"Last Deployment ID: {last_deployment.id}")
# Run the last deployment
flow_run = await client.create_flow_run_from_deployment(
deployment_id=last_deployment.id
)
print(f"Flow Run ID: {flow_run.id}")
# Wait for flow run to complete (simple polling logic)
while True:
flow_run_state = (await client.read_flow_run(flow_run.id)).state.name
if flow_run_state in {"Completed", "Failed", "Cancelled"}:
break
print(f"Current state: {flow_run_state}. Waiting to complete...")
await asyncio.sleep(10)
# Retrieve logs from the flow run
logs_filter = LogFilter(flow_run_id={"any_": [str(flow_run.id)]})
logs = await client.read_logs(log_filter=logs_filter)
for log in logs:
print(f"{log.timestamp} - {log.level}: {log.message}")
# Run the asyncio event loop
asyncio.run(trigger_deployment())
Changes:
1. Correctly formatted the LogFilter
to pass as log_filter
argument.
2. Converted flow_run.id
to a string within the LogFilter
.
Please ensure you are using Prefect v2.0 or later and have all necessary dependencies installed for the script to work correctly. This should resolve the AttributeError
by providing the correctly constructed filter object.