Bo Ritter
06/03/2024, 7:37 PMrequest_prefect_api
sends a request to the prefect api
@task
def wait_for_deployment(flow_run_id: str, log_polling_interval: int = 25, max_run_time: int = 1000):
logger = get_run_logger()
api_key = "XX"
api_url = "xx"
while True:
flow_run = request_prefect_api(path=f"/flow_runs/{flow_run_id}", api_key=api_key, api_url=api_url)
<http://logger.info|logger.info>(f"{flow_run['name']} : {flow_run['state_type']}")
state = flow_run["state_type"]
if state in ["COMPLETED", "CANCELLED"]:
break
if state == "FAILED":
raise Exception(f"Deployment: {flow_run['name']}, Flow run failed: {flow_run['state']['message']}")
time.sleep(log_polling_interval)
Marvin
06/03/2024, 7:37 PMFlowRunner
and async capabilities to run the flows and await their completion status. This will allow us to avoid the requests
library and handle errors appropriately. Below is the improved version:
python
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
from prefect.client.orchestration import get_client
import asyncio
@task
async def wait_for_deployment(flow_run_id: str, log_polling_interval: int = 25, max_run_time: int = 1000):
logger = get_run_logger()
async with get_client() as client:
while True:
flow_run = await client.read_flow_run(flow_run_id)
<http://logger.info|logger.info>(f"{flow_run.name} : {flow_run.state.type}")
state = flow_run.state.type
if state in ["COMPLETED", "CANCELLED"]:
break
if state == "FAILED":
raise Exception(f"Deployment: {flow_run.name}, Flow run failed: {flow_run.state.message}")
await asyncio.sleep(log_polling_interval)
@task
def log_errors(errors: list):
logger = get_run_logger()
for error in errors:
logger.error(error)
@flow
async def run_and_wait_for_deployments(deployments: list):
# Run deployments in parallel
coroutine_list = [run_deployment(deployment) for deployment in deployments]
flow_runs = await asyncio.gather(*coroutine_list)
# Wait for deployments to complete
wait_tasks = [wait_for_deployment(flow_run.id) for flow_run in flow_runs]
# Collect errors
errors = []
for task in asyncio.as_completed(wait_tasks):
try:
await task
except Exception as e:
errors.append(str(e))
# Log errors and raise main exception if any deployment failed
if errors:
log_errors(errors)
raise Exception("One or more deployments failed. See error logs.")
# Example usage
if __name__ == "__main__":
deployment_list = ["deployment1", "deployment2", "deployment3"] # Replace with actual deployment names or ids
run_and_wait_for_deployments(deployment_list)
Changes made:
1. Replaced the requests
library with get_client
from prefect.client.orchestration
to interact with the Prefect API.
2. Refactored wait_for_deployment
to use async
and await
which aligns better with asyncio's non-blocking nature.
3. Used asyncio.gather
to run the deployment flows in parallel and asyncio.as_completed
to handle errors as they arise.
4. Added error logging through a separate task log_errors
.
Note: You’ll need to have the Prefect client and appropriate settings configured for the async client to work.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by