<@ULVA73B9P> prefect v2 example of running a deplo...
# ask-marvin
m
@Marvin prefect v2 example of running a deployment async and waiting
n
not sure where marvin went, but
run_deployment
can be used as an async method
flow_run = await run_deployment(...)
by default it will wait until the flow run is over, otherwise you can set
timeout=0
to return immediately without waiting
m
To run and wait for a deployment asynchronously in Prefect 2.x, you can use the
run_deployment
function along with
wait_for_flow_run
. Below is an example pattern based on available resources and issues discussed in the community. 1. Running an Async Deployment: - Use the
run_deployment
function to execute a deployment. This function is typically used in an asynchronous context within a Prefect flow. 2. Waiting for Completion: - After starting the deployment, you can use
wait_for_flow_run
to pause execution until the deployment finishes. Here's a simple workflow with these elements:
Copy code
python
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment, wait_for_flow_run
import asyncio

@flow
async def parent_flow():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Running child deployment asynchronously...")
    # Kick off the deployment
    deployment_run = await run_deployment("child-flow/my_deployment")
    
    # Wait for the deployment to complete
    await wait_for_flow_run(deployment_run.id)
    <http://logger.info|logger.info>("Child deployment completed!")

# Entry point
if __name__ == "__main__":
    asyncio.run(parent_flow())
Points to Consider: - Ensure that both your
parent_flow
and
child-flow
are defined as deployment scripts with proper registration and each are asynchronously configured. - Be mindful of any race conditions when dealing with sub-deployments. If a deployment is waiting on a parent or sibling deployment to finish, ensure that these dependencies are managed correctly. - I found a known issue with the
--wait
option in Prefect CLI, which might fail the CI process even when the flow runs successfully. This issue may be relevant if using CLI-based solutions (GitHub Issue). For more detailed examples and documentation, you may visit Prefect 2.x documentation on deployments. Keep in mind that Prefect's API and behavior may change, and referring to the official latest documentation is always a good practice.
m
@Marvin multiple run_deployments and wait for all of them. get states of each of them. throw error if any states are not COMPLETED
m
To run multiple deployments simultaneously and wait for all of them to complete while checking their states, you can use Prefect's
run_deployment
for each and
wait_for_flow_run
to ensure they complete. After waiting, check their states and throw an error if any aren't
COMPLETED
. Here is an outline for achieving this with Prefect 2.x: 1. Run Deployments Asynchronously: - Launch each deployment with
run_deployment
in an asynchronous manner. 2. Wait For Completion: - Use
wait_for_flow_run
to ensure each deployment completes. 3. Check States: - Fetch the current state of each deployment and validate if they are
COMPLETED
. - If any deployment state is not
COMPLETED
, raise an error. Here's how you might handle it in a flow:
Copy code
python
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment, wait_for_flow_run
from prefect.client import get_client
import asyncio

@flow
async def manage_deployments():
    logger = get_run_logger()
    client = get_client()

    # Define your deployment names
    deployment_names = [
        "child-flow/deployment_one",
        "child-flow/deployment_two",
        # Add more deployments as needed
    ]

    # Run deployments asynchronously
    deployment_runs = await asyncio.gather(
        *[run_deployment(deployment_name) for deployment_name in deployment_names]
    )

    # Wait for all deployments to complete
    states = await asyncio.gather(
        *[wait_for_flow_run(run.id) for run in deployment_runs]
    )

    # Check if all deployments completed successfully
    for state, run in zip(states, deployment_runs):
        if state.is_completed():
            <http://logger.info|logger.info>(f"Deployment {run.id} completed successfully.")
        else:
            logger.error(f"Deployment {run.id} failed with state: {state.type}")
            raise RuntimeError(f"Deployment {run.id} did not complete successfully.")

# Entry point
if __name__ == "__main__":
    asyncio.run(manage_deployments())
Notes: - Make sure Prefect server or agent is running and deployments are registered and ready. - If dependencies among deployments exist, handle them accordingly. - This example uses
asyncio.gather
to efficiently manage multiple asynchronous tasks. - Refer to Prefect 2.x deployments documentation for more details. This code snippet provides a structured way to handle multiple deployments while ensuring their successful execution in Prefect 2.x.