<@ULVA73B9P> write me a python script that checks ...
# ask-marvin
d
@Marvin write me a python script that checks the last deployment, fetches that deployment and runs the last deployment, then takes the logs from the flow run
m
I managed to find relevant pieces of information for your request. Here is a Python script to achieve the following: 1. Fetch the last deployment. 2. Run the fetched deployment. 3. Retrieve logs from the flow run. However, please make sure that you have authenticated with Prefect Cloud or a self-hosted Prefect server. You'll also need to replace placeholders with actual values according to your specific use case.
Copy code
python
from prefect import Client
from prefect.utilities.logging import get_run_logger
import datetime

# Initialize the Prefect Client
client = Client()

# Fetch the last deployment
deployments = client.deployments()
last_deployment = deployments[-1]  # Assuming the last one in the list is the one you want

print(f"Last Deployment ID: {last_deployment.id}")

# Run the last deployment
flow_run = client.create_flow_run_from_deployment(deployment_id=last_deployment.id)
print(f"Flow Run ID: {flow_run.id}")

# Wait for flow run to complete (you can implement a polling logic as per your requirements)
print("Waiting for flow run to complete...")

# Optionally, some waiting logic here

# Retrieve logs from the flow run
flow_run_logs = client.get_logs(flow_run_id=flow_run.id)
for log in flow_run_logs:
    print(f"{log.timestamp} - {log.level}: {log.message}")

print("Logs retrieved successfully.")
Make sure to update the script with proper waiting logic if you need to wait for the flow run to complete and for authenticating with your specific Prefect environment. For further details, you can refer to: - Prefect Deployments - Prefect Logging
d
@Marvin help me fix this bug: python3 trigger-deployment.py /home/seedoo/.local/lib/python3.10/site-packages/prefect/__init__.py123 UserWarning: Attempted import of 'prefect.Client', which is part of Prefect 1.x, while Prefect 2.19.6 is installed. If you're upgrading you'll need to update your code, see the Prefect 2.x migration guide:
<https://orion-docs.prefect.io/migration_guide/>
. Otherwise ensure that your code is pinned to the expected version. warnings.warn( Traceback (most recent call last): File "/home/seedoo/dijar-kickoff/prefect/trigger-deployment.py", line 1, in <module> from prefect import Client ImportError: cannot import name 'Client' from 'prefect' (/home/seedoo/.local/lib/python3.10/site-packages/prefect/__init__.py)
m
The warning you're seeing indicates that you are attempting to use Prefect 1.x functionality with Prefect 2.x. The
Client
class doesn't exist in Prefect 2.x. Instead, we utilize the Prefect Orion API. Here’s how you can achieve fetching the last deployment, running it, and retrieving logs in Prefect 2.x by utilizing Prefect's Orion client:
Copy code
python
from prefect.client.orion import get_client
from prefect.orion.schemas.filters import FlowFilter, DeploymentFilter
from prefect.orion.schemas.core import FlowRun
import asyncio

async def trigger_deployment():
    async with get_client() as client:
        # Fetch the last deployment
        deployments = await client.read_deployments()
        last_deployment = deployments[-1]  # Assuming this is the correct logic to fetch the last one
        print(f"Last Deployment ID: {last_deployment.id}")

        # Run the last deployment
        flow_run = await client.create_flow_run_from_deployment(
            deployment_id=last_deployment.id
        )
        print(f"Flow Run ID: {flow_run.id}")

        # Wait for flow run to complete (simple polling logic)
        while True:
            flow_run_state = (await client.read_flow_run(flow_run.id)).state.name
            if flow_run_state in {"Completed", "Failed", "Cancelled"}:
                break
            print(f"Current state: {flow_run_state}. Waiting to complete...")
            await asyncio.sleep(10)

        # Retrieve logs from the flow run
        logs = await client.read_logs(flow_run_id=flow_run.id)
        for log in logs:
            print(f"{log.timestamp} - {log.level}: {log.message}")

# Run the asyncio event loop
asyncio.run(trigger_deployment())
Be sure to replace the logic for determining the "last deployment" with your actual requirement since
deployments[-1]
is a simple assumption. This script retrieves the last deployment, triggers a flow run from that deployment, and then polls to wait for its completion and gets logs. For more advanced usage or specific configurations, refer to the following: - Prefect Deployments - Prefect Logging - Migration Guide
d
@Marvin fix prefect$ prefect version Version: 2.19.6 API version: 0.8.4 Python version: 3.10.12 Git commit: 9d938fe7 Built: Mon, Jun 24, 2024 10:23 AM OS/Arch: linux/x86_64 Profile: default Server type: server (flows) seedoo@SeeDooGPUV2:~/dijar-kickoff/prefect$ python3 trigger-deployment.py Traceback (most recent call last): File "/home/seedoo/dijar-kickoff/prefect/trigger-deployment.py", line 1, in <module> from prefect.client.orion import get_client ModuleNotFoundError: No module named 'prefect.client.orion'
m
It looks like we need to use the appropriate import for Prefect 2.x. Prefect 2.x uses
from prefect.client.orchestration import get_client
. Let's update the script to correctly import the client and proceed with fetching the last deployment, executing it, and retrieving logs. Here's your updated code:
Copy code
python
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import FlowFilter, DeploymentFilter
import asyncio

async def trigger_deployment():
    async with get_client() as client:
        # Fetch the last deployment
        deployments = await client.read_deployments()
        last_deployment = deployments[-1]  # Assuming this is the correct logic to fetch the last one
        print(f"Last Deployment ID: {last_deployment.id}")

        # Run the last deployment
        flow_run = await client.create_flow_run_from_deployment(
            deployment_id=last_deployment.id
        )
        print(f"Flow Run ID: {flow_run.id}")

        # Wait for flow run to complete (simple polling logic)
        while True:
            flow_run_state = (await client.read_flow_run(flow_run.id)).state.name
            if flow_run_state in {"Completed", "Failed", "Cancelled"}:
                break
            print(f"Current state: {flow_run_state}. Waiting to complete...")
            await asyncio.sleep(10)

        # Retrieve logs from the flow run
        logs = await client.read_logs(flow_run_id=flow_run.id)
        for log in logs:
            print(f"{log.timestamp} - {log.level}: {log.message}")

# Run the asyncio event loop
asyncio.run(trigger_deployment())
Make sure you have the appropriate Prefect version installed and that your environment is correctly set up. This script should now work without the import errors you were encountering. For reference, you can look at: - Prefect 2.x Official Documentation
d
@Marvin the scripts works perfectly expect for the part for logs retrivial: Traceback (most recent call last): File "/home/seedoo/dijar-kickoff/prefect/trigger-deployment.py", line 32, in <module> asyncio.run(trigger_deployment()) File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/usr/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete return future.result() File "/home/seedoo/dijar-kickoff/prefect/trigger-deployment.py", line 27, in trigger_deployment logs = await client.read_logs(flow_run_id=flow_run.id) TypeError: PrefectClient.read_logs() got an unexpected keyword argument 'flow_run_id'
m
It seems there isn't a direct method to fetch logs using the method
read_logs
with a
flow_run_id
keyword argument. Instead, you may need to query logs via the Prefect REST API or use filters with the
orion
client to get the related flow run logs. Let's use a different approach to retrieve logs by filtering based on the flow run ID:
Copy code
python
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import FlowRunFilter, LogFilter, LogRead, DeploymentsFilter
import asyncio

async def trigger_deployment():
    async with get_client() as client:
        # Fetch the last deployment
        deployments = await client.read_deployments()
        last_deployment = deployments[-1]  # Assuming this is the correct logic to fetch the last one
        print(f"Last Deployment ID: {last_deployment.id}")

        # Run the last deployment
        flow_run = await client.create_flow_run_from_deployment(
            deployment_id=last_deployment.id
        )
        print(f"Flow Run ID: {flow_run.id}")

        # Wait for flow run to complete (simple polling logic)
        while True:
            flow_run_state = (await client.read_flow_run(flow_run.id)).state.name
            if flow_run_state in {"Completed", "Failed", "Cancelled"}:
                break
            print(f"Current state: {flow_run_state}. Waiting to complete...")
            await asyncio.sleep(10)

        # Retrieve logs from the flow run
        logs_filter = LogFilter(
            flow_run_id={"any_": [flow_run.id]}
        )
        logs = await client.read_logs(logs_filter)
        for log in logs:
            print(f"{log.timestamp} - {log.level}: {log.message}")

# Run the asyncio event loop
asyncio.run(trigger_deployment())
This updated code uses the
LogFilter
to filter logs based on the flow run ID and retrieve the logs accordingly. Make sure your Prefect environment is correctly set up for this approach. Additionally, if you need more details or want to verify, consider referring to the Prefect 2.0 Logging Documentation.
d
@Marvin python3 trigger-deployment.py Traceback (most recent call last): File "/home/seedoo/dijar-kickoff/prefect/trigger-deployment.py", line 2, in <module> from prefect.client.schemas.filters import FlowRunFilter, LogFilter, LogRead, DeploymentsFilter ImportError: cannot import name 'LogRead' from 'prefect.client.schemas.filters' (/home/seedoo/.local/lib/python3.10/site-packages/prefect/client/schemas/filters.py)
m
It seems that Prefect's API might have different arguments or methods for log retrieval than initially anticipated. Let's correct the imports and use the proper methods for fetching logs. Here is the updated script:
Copy code
python
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import FlowRunFilter, LogFilter
import asyncio

async def trigger_deployment():
    async with get_client() as client:
        # Fetch the last deployment
        deployments = await client.read_deployments()
        last_deployment = deployments[-1]  # Assuming this is the correct logic to fetch the last one
        print(f"Last Deployment ID: {last_deployment.id}")

        # Run the last deployment
        flow_run = await client.create_flow_run_from_deployment(
            deployment_id=last_deployment.id
        )
        print(f"Flow Run ID: {flow_run.id}")

        # Wait for flow run to complete (simple polling logic)
        while True:
            flow_run_state = (await client.read_flow_run(flow_run.id)).state.name
            if flow_run_state in {"Completed", "Failed", "Cancelled"}:
                break
            print(f"Current state: {flow_run_state}. Waiting to complete...")
            await asyncio.sleep(10)

        # Retrieve logs from the flow run
        logs_filter = {
            "flow_run_id": {"any_": [flow_run.id]}
        }
        logs = await client.read_logs(logs_filter=logs_filter)
        for log in logs:
            print(f"{log.timestamp} - {log.level_name}: {log.message}")

# Run the asyncio event loop
asyncio.run(trigger_deployment())
Key changes: 1. Removed the
LogRead
import, which was incorrect. 2. Adjusted the call to
read_logs
to use a
logs_filter
dictionary instead of a
FlowRunFilter
. Please ensure you have the appropriate Prefect version installed and that your environment is configured correctly.
d
@Marvin python3 trigger-deployment.py Last Deployment ID: 9513d0a8-950b-4b68-9581-32a59bda591c Flow Run ID: 2a4d8702-5647-42ce-9670-15dcf2ef7aaf Current state: Scheduled. Waiting to complete... Current state: Pending. Waiting to complete... Current state: Running. Waiting to complete... Traceback (most recent call last): File "/home/seedoo/dijar-kickoff/prefect/trigger-deployment.py", line 35, in <module> asyncio.run(trigger_deployment()) File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/usr/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete return future.result() File "/home/seedoo/dijar-kickoff/prefect/trigger-deployment.py", line 30, in trigger_deployment logs = await client.read_logs(logs_filter=logs_filter) TypeError: PrefectClient.read_logs() got an unexpected keyword argument 'logs_filter'
m
It seems there's an issue with using
logs_filter
as a parameter for
read_logs
. Let's adjust the approach to properly retrieve logs without causing argument errors. We'll check the available methods in the Prefect 2.x API to correctly retrieve and display logs. To be safe, I'll avoid parameterizations that might not be supported. Instead, I'll default to filtering results manually if necessary. Here's an updated version of the script that correctly retrieves logs after the flow run:
Copy code
python
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import FlowRunFilter
import asyncio

async def trigger_deployment():
    async with get_client() as client:
        # Fetch the last deployment
        deployments = await client.read_deployments()
        last_deployment = deployments[-1]  # Assuming this is the correct logic to fetch the last one
        print(f"Last Deployment ID: {last_deployment.id}")

        # Run the last deployment
        flow_run = await client.create_flow_run_from_deployment(
            deployment_id=last_deployment.id
        )
        print(f"Flow Run ID: {flow_run.id}")

        # Wait for flow run to complete (simple polling logic)
        while True:
            flow_run_state = (await client.read_flow_run(flow_run.id)).state.name
            if flow_run_state in {"Completed", "Failed", "Cancelled"}:
                break
            print(f"Current state: {flow_run_state}. Waiting to complete...")
            await asyncio.sleep(10)

        # Retrieve logs from the flow run
        logs = await client.read_logs(flow_run.id)
        for log in logs:
            print(f"{log.timestamp} - {log.level_name}: {log.message}")

# Run the asyncio event loop
asyncio.run(trigger_deployment())
Changes: 1. Removed the
logs_filter
argument. 2. Corrected the call to
read_logs
by directly passing
flow_run.id
. This should fix the
TypeError
while correctly querying logs for the specified flow run. Ensure your Prefect setup and environment meet these requirements.
d
@Marvin python3 trigger-deployment.py Last Deployment ID: 9513d0a8-950b-4b68-9581-32a59bda591c Flow Run ID: f3456c8b-645d-4af6-a1b2-56a60001183e Current state: Scheduled. Waiting to complete... Current state: Pending. Waiting to complete... Current state: Running. Waiting to complete... Traceback (most recent call last): File "prefect/trigger-deployment.py", line 32, in module asyncio.run(trigger_deployment()) File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/usr/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete return future.result() File "/prefect/trigger-deployment.py", line 27, in trigger_deployment logs = await client.read_logs(flow_run.id) File "/.local/lib/python3.10/site-packages/prefect/client/orchestration.py", line 2522, in read_logs "logs": log_filter.dict(json_compatible=True) if log_filter else None, AttributeError: 'UUID' object has no attribute 'dict'
m
Your message was too long by 10 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
python3 trigger-deployment.py
Last Deployment ID: 9513d0a8-950b-4b68-9581-32a59bda591c Flow Run ID: f3456c8b-645d-4af6-a1b2-56a60001183e Current state: Scheduled. Waiting to complete... Current state: Pending. Waiting to complete... Current state: Running. Waiting to complete... Traceback (most recent call last): File "/home/seedoo/dijar-kickoff/prefect/trigger-deployment.py", line 32, in <module> asyncio.run(trigger_deployment()) File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/usr/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete return future.result() File "/home/seedoo/dijar-kickoff/prefect/trigger-deployment.py", line 27, in trigger_deployment logs = await client.read_logs(flow_run.id) File "/home/seedoo/.local/lib/python3.10/site-packages/prefect/client/orchestration.py", line 2522, in read_logs "logs": log_filter.dict(json_compatible=True) if log_filter else None, AttributeError:
It appears that the function
client.read_logs
was not correctly called in the previously provided script.
read_logs
requires the use of a filter object. To construct and pass the correct filter object, we need to use classes such as
LogFilter
. Below is the corrected script:
Copy code
python
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import FlowRunFilter, LogFilter
import asyncio

async def trigger_deployment():
    async with get_client() as client:
        # Fetch the last deployment
        deployments = await client.read_deployments()
        last_deployment = deployments[-1]  # Assuming this is the correct logic to fetch the last one
        print(f"Last Deployment ID: {last_deployment.id}")

        # Run the last deployment
        flow_run = await client.create_flow_run_from_deployment(
            deployment_id=last_deployment.id
        )
        print(f"Flow Run ID: {flow_run.id}")

        # Wait for flow run to complete (simple polling logic)
        while True:
            flow_run_state = (await client.read_flow_run(flow_run.id)).state.name
            if flow_run_state in {"Completed", "Failed", "Cancelled"}:
                break
            print(f"Current state: {flow_run_state}. Waiting to complete...")
            await asyncio.sleep(10)

        # Retrieve logs from the flow run
        logs_filter = LogFilter(flow_run_id={"any_": [str(flow_run.id)]})
        logs = await client.read_logs(log_filter=logs_filter)
        
        for log in logs:
            print(f"{log.timestamp} - {log.level}: {log.message}")

# Run the asyncio event loop
asyncio.run(trigger_deployment())
Changes: 1. Correctly formatted the
LogFilter
to pass as
log_filter
argument. 2. Converted
flow_run.id
to a string within the
LogFilter
. Please ensure you are using Prefect v2.0 or later and have all necessary dependencies installed for the script to work correctly. This should resolve the
AttributeError
by providing the correctly constructed filter object.