<@ULVA73B9P> Can you improve this code so it doesn't use the 'requests' library if possible. Ensure ...
b
@Marvin Can you improve this code so it doesn't use the 'requests' library if possible. Ensure it waits for all flows to reach a 'final' state. If one or more flows fail, log all errors and raise an exception for the main flow. The purpose of the below code is to be able to run flows using run_deployment in parallel, and then wait for them to complete.
request_prefect_api
sends a request to the prefect api
Copy code
@task
def wait_for_deployment(flow_run_id: str, log_polling_interval: int = 25, max_run_time: int = 1000):

    logger = get_run_logger()

    api_key = "XX"
    api_url = "xx"
    while True:
        flow_run = request_prefect_api(path=f"/flow_runs/{flow_run_id}", api_key=api_key, api_url=api_url)
        <http://logger.info|logger.info>(f"{flow_run['name']} : {flow_run['state_type']}")

        state = flow_run["state_type"]

        if state in ["COMPLETED", "CANCELLED"]:
            break
        if state == "FAILED":
            raise Exception(f"Deployment: {flow_run['name']}, Flow run failed: {flow_run['state']['message']}")

        time.sleep(log_polling_interval)
m
Certainly. We can make use of the Prefect's
FlowRunner
and async capabilities to run the flows and await their completion status. This will allow us to avoid the
requests
library and handle errors appropriately. Below is the improved version:
Copy code
python
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
from prefect.client.orchestration import get_client
import asyncio

@task
async def wait_for_deployment(flow_run_id: str, log_polling_interval: int = 25, max_run_time: int = 1000):
    logger = get_run_logger()

    async with get_client() as client:
        while True:
            flow_run = await client.read_flow_run(flow_run_id)
            <http://logger.info|logger.info>(f"{flow_run.name} : {flow_run.state.type}")

            state = flow_run.state.type

            if state in ["COMPLETED", "CANCELLED"]:
                break
            if state == "FAILED":
                raise Exception(f"Deployment: {flow_run.name}, Flow run failed: {flow_run.state.message}")

            await asyncio.sleep(log_polling_interval)

@task
def log_errors(errors: list):
    logger = get_run_logger()
    for error in errors:
        logger.error(error)

@flow
async def run_and_wait_for_deployments(deployments: list):
    # Run deployments in parallel
    coroutine_list = [run_deployment(deployment) for deployment in deployments]
    flow_runs = await asyncio.gather(*coroutine_list)

    # Wait for deployments to complete
    wait_tasks = [wait_for_deployment(flow_run.id) for flow_run in flow_runs]
    
    # Collect errors
    errors = []
    for task in asyncio.as_completed(wait_tasks):
        try:
            await task
        except Exception as e:
            errors.append(str(e))
    
    # Log errors and raise main exception if any deployment failed
    if errors:
        log_errors(errors)
        raise Exception("One or more deployments failed. See error logs.")

# Example usage
if __name__ == "__main__":
    deployment_list = ["deployment1", "deployment2", "deployment3"]  # Replace with actual deployment names or ids
    run_and_wait_for_deployments(deployment_list)
Changes made: 1. Replaced the
requests
library with
get_client
from
prefect.client.orchestration
to interact with the Prefect API. 2. Refactored
wait_for_deployment
to use
async
and
await
which aligns better with asyncio's non-blocking nature. 3. Used
asyncio.gather
to run the deployment flows in parallel and
asyncio.as_completed
to handle errors as they arise. 4. Added error logging through a separate task
log_errors
. Note: You’ll need to have the Prefect client and appropriate settings configured for the async client to work.