Mark Reyes
10/07/2024, 4:34 PMNate
10/07/2024, 4:54 PMrun_deployment
can be used as an async method
flow_run = await run_deployment(...)
by default it will wait until the flow run is over, otherwise you can set timeout=0
to return immediately without waitingMarvin
10/07/2024, 5:23 PMrun_deployment
function along with wait_for_flow_run
. Below is an example pattern based on available resources and issues discussed in the community.
1. Running an Async Deployment:
- Use the run_deployment
function to execute a deployment. This function is typically used in an asynchronous context within a Prefect flow.
2. Waiting for Completion:
- After starting the deployment, you can use wait_for_flow_run
to pause execution until the deployment finishes.
Here's a simple workflow with these elements:
python
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment, wait_for_flow_run
import asyncio
@flow
async def parent_flow():
logger = get_run_logger()
<http://logger.info|logger.info>("Running child deployment asynchronously...")
# Kick off the deployment
deployment_run = await run_deployment("child-flow/my_deployment")
# Wait for the deployment to complete
await wait_for_flow_run(deployment_run.id)
<http://logger.info|logger.info>("Child deployment completed!")
# Entry point
if __name__ == "__main__":
asyncio.run(parent_flow())
Points to Consider:
- Ensure that both your parent_flow
and child-flow
are defined as deployment scripts with proper registration and each are asynchronously configured.
- Be mindful of any race conditions when dealing with sub-deployments. If a deployment is waiting on a parent or sibling deployment to finish, ensure that these dependencies are managed correctly.
- I found a known issue with the --wait
option in Prefect CLI, which might fail the CI process even when the flow runs successfully. This issue may be relevant if using CLI-based solutions (GitHub Issue).
For more detailed examples and documentation, you may visit Prefect 2.x documentation on deployments. Keep in mind that Prefect's API and behavior may change, and referring to the official latest documentation is always a good practice.Mark Reyes
10/07/2024, 9:15 PMMarvin
10/07/2024, 9:15 PMrun_deployment
for each and wait_for_flow_run
to ensure they complete. After waiting, check their states and throw an error if any aren't COMPLETED
.
Here is an outline for achieving this with Prefect 2.x:
1. Run Deployments Asynchronously:
- Launch each deployment with run_deployment
in an asynchronous manner.
2. Wait For Completion:
- Use wait_for_flow_run
to ensure each deployment completes.
3. Check States:
- Fetch the current state of each deployment and validate if they are COMPLETED
.
- If any deployment state is not COMPLETED
, raise an error.
Here's how you might handle it in a flow:
python
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment, wait_for_flow_run
from prefect.client import get_client
import asyncio
@flow
async def manage_deployments():
logger = get_run_logger()
client = get_client()
# Define your deployment names
deployment_names = [
"child-flow/deployment_one",
"child-flow/deployment_two",
# Add more deployments as needed
]
# Run deployments asynchronously
deployment_runs = await asyncio.gather(
*[run_deployment(deployment_name) for deployment_name in deployment_names]
)
# Wait for all deployments to complete
states = await asyncio.gather(
*[wait_for_flow_run(run.id) for run in deployment_runs]
)
# Check if all deployments completed successfully
for state, run in zip(states, deployment_runs):
if state.is_completed():
<http://logger.info|logger.info>(f"Deployment {run.id} completed successfully.")
else:
logger.error(f"Deployment {run.id} failed with state: {state.type}")
raise RuntimeError(f"Deployment {run.id} did not complete successfully.")
# Entry point
if __name__ == "__main__":
asyncio.run(manage_deployments())
Notes:
- Make sure Prefect server or agent is running and deployments are registered and ready.
- If dependencies among deployments exist, handle them accordingly.
- This example uses asyncio.gather
to efficiently manage multiple asynchronous tasks.
- Refer to Prefect 2.x deployments documentation for more details.
This code snippet provides a structured way to handle multiple deployments while ensuring their successful execution in Prefect 2.x.