Slackbot
06/03/2024, 7:27 PMMarvin
06/03/2024, 7:27 PMCan you improve this code so it doesn't use the 'requests' library if possible. Ensure it waits for all flows to reach a 'final' state. If one or more flows fail, log all errors and raise an exception for the main flow. The purpose of the below code is to be able to run flows using run_deployment in parallel, and then wait for them to complete```import time import requests from prefect import get_run_logger, task from prefect.deployments import run_deployment @task def wait_for_deployment(flow_run_id: str, log_polling_interval: int = 25, max_run_time: int = 1000): logger = get_run_logger() api_key = "XX" api_url = "xx" while True: flow_run = request_prefect_api(path=f"/flow_runs/{flow_run_id}", api_key=api_key, api_url=api_url) logger.info(f"{flow_run['name']} : {flow_run['state_type']}") state = flow_run["state_type"] if state in ["COMPLETED", "CANCELLED"]: break if state == "FAILED": raise Exception(f"Deployment: {flow_run['name']}, Flow run failed: {flow_run['state']['message']}") if flow_run["total_run_time"] > max_run_time: raise Exception(f"Flow
Nate
06/03/2024, 7:29 PMNate
06/03/2024, 7:32 PMasyncio.gather
and run_deployment
here pretty easily like
params_list = [{"x": 42}, ...]
completed_runs = await asyncio.gather(*[run_deployment(name=..., parameters=p) for p in params_list])
since run_deployment
will poll for completion by defaultBo Ritter
06/03/2024, 7:36 PMwait_for_deployment
function, I could reuse it elsewhereNate
06/03/2024, 7:41 PMwait_for_deployment
function, I could reuse it elsewhere
run_deployment
is already such a function ๐
however if you dont want to use async, you could map a task
@task
def run_deployment_task(name: str, parameters: dict | None = None):
return run_deployment(name, parameters)
@flow
def a_bunch():
params: list[dict] = [...]
futures = run_deployment_task.map(name=unmapped("my-flow/deployment"), parameters=params)
completed_runs = [fut.result() for fut in futures]
Bo Ritter
06/03/2024, 7:44 PMNate
06/03/2024, 7:45 PMparams_list = [{"x": 42}, ...]
completed_runs = await asyncio.gather(*[run_deployment(name=..., parameters=p) for p in params_list])
to use <http://asyncio.as|asyncio.as>_completed
so we can check .is_failed()
on each as it completes?Nate
06/03/2024, 7:45 PMreturn_exceptions
kwarg to supress failuresMarvin
06/03/2024, 7:45 PMasyncio.as_completed
to handle each task's status as soon as it is done. How delightful.
Let me update your code to reflect that.
python
import asyncio
params_list = [{"x": 42}, ...]
async def run_deployment(name, parameters):
# Simulate the deployment function, replace with the actual implementation
...
async def main():
tasks = [run_deployment(name=..., parameters=p) for p in params_list]
for task in asyncio.as_completed(tasks):
try:
result = await task
if result.is_failed():
print("A task failed:", result)
else:
print("Task completed successfully:", result)
except Exception as e:
print("A task raised an exception:", e)
# Run the main function
asyncio.run(main())
You will need to replace the ...
with appropriate values or functions to reflect the actual deployment logic.
Hereโs the step-by-step explanation:
1. The params_list
and run_deployment
function remain unchanged.
2. The main
function creates the list of tasks.
3. asyncio.as_completed
is used to iterate over tasks as they complete.
4. For each completed task, check if it failed (result.is_failed()
) or succeeded.
5. Use asyncio.run
to run the main
function.
I hope that's what you're looking for. If you need more specific details or have additional questions, don't hesitate to ask. Sigh.Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by