This message was deleted.
# ask-marvin
s
This message was deleted.
m
Your message was too long by 139 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
Can you improve this code so it doesn't use the 'requests' library if possible. Ensure it waits for all flows to reach a 'final' state. If one or more flows fail, log all errors and raise an exception for the main flow. The purpose of the below code is to be able to run flows using run_deployment in parallel, and then wait for them to complete
```import time import requests from prefect import get_run_logger, task from prefect.deployments import run_deployment @task def wait_for_deployment(flow_run_id: str, log_polling_interval: int = 25, max_run_time: int = 1000): logger = get_run_logger() api_key = "XX" api_url = "xx" while True: flow_run = request_prefect_api(path=f"/flow_runs/{flow_run_id}", api_key=api_key, api_url=api_url) logger.info(f"{flow_run['name']} : {flow_run['state_type']}") state = flow_run["state_type"] if state in ["COMPLETED", "CANCELLED"]: break if state == "FAILED": raise Exception(f"Deployment: {flow_run['name']}, Flow run failed: {flow_run['state']['message']}") if flow_run["total_run_time"] > max_run_time: raise Exception(f"Flow
n
hey whoever posted ๐Ÿ™‚
๐Ÿ‘‹ 1
you could probably use
asyncio.gather
and
run_deployment
here pretty easily like
Copy code
params_list = [{"x": 42}, ...]
completed_runs = await asyncio.gather(*[run_deployment(name=..., parameters=p) for p in params_list])
since
run_deployment
will poll for completion by default
b
I was trying to avoid using asyncio if I could, since I feel like it adds a level of complexity to the flows. And with a custom
wait_for_deployment
function, I could reuse it elsewhere
n
ah, fwiw I think asyncio is the easiest way to do >> run flows using run_deployment in parallel, > with a custom
wait_for_deployment
function, I could reuse it elsewhere
run_deployment
is already such a function ๐Ÿ™‚ however if you dont want to use async, you could map a task
Copy code
@task
def run_deployment_task(name: str, parameters: dict | None = None):
  return run_deployment(name, parameters)

@flow
def a_bunch():
  params: list[dict] = [...]
  futures = run_deployment_task.map(name=unmapped("my-flow/deployment"), parameters=params)
  completed_runs = [fut.result() for fut in futures]
b
Ahh okay yeah it does seem like the asyncio method would be the easiest. What happens if one of the flows fails?
n
hey @Marvin can you update this
Copy code
params_list = [{"x": 42}, ...]
completed_runs = await asyncio.gather(*[run_deployment(name=..., parameters=p) for p in params_list])
to use
<http://asyncio.as|asyncio.as>_completed
so we can check
.is_failed()
on each as it completes?
as_completed should work if you need to react to individual failures otherwise I think gather takes some
return_exceptions
kwarg to supress failures
๐Ÿ™Œ 1
m
Ah, using
asyncio.as_completed
to handle each task's status as soon as it is done. How delightful. Let me update your code to reflect that.
Copy code
python
import asyncio

params_list = [{"x": 42}, ...]

async def run_deployment(name, parameters):
    # Simulate the deployment function, replace with the actual implementation
    ...

async def main():
    tasks = [run_deployment(name=..., parameters=p) for p in params_list]
    
    for task in asyncio.as_completed(tasks):
        try:
            result = await task
            if result.is_failed():
                print("A task failed:", result)
            else:
                print("Task completed successfully:", result)
        except Exception as e:
            print("A task raised an exception:", e)

# Run the main function
asyncio.run(main())
You will need to replace the
...
with appropriate values or functions to reflect the actual deployment logic. Hereโ€™s the step-by-step explanation: 1. The
params_list
and
run_deployment
function remain unchanged. 2. The
main
function creates the list of tasks. 3.
asyncio.as_completed
is used to iterate over tasks as they complete. 4. For each completed task, check if it failed (
result.is_failed()
) or succeeded. 5. Use
asyncio.run
to run the
main
function. I hope that's what you're looking for. If you need more specific details or have additional questions, don't hesitate to ask. Sigh.