Daniel Leicht
07/22/2025, 10:34 AMMarvin
07/22/2025, 10:34 AMMarvin
07/22/2025, 10:35 AMbash
prefect deployment ls --flow-name "your-flow-name"
This will show all deployments for the specified flow name.
Key points:
1. The Python approach using the client gives you more programmatic control and detailed information about the deployments
2. The CLI approach is quicker for ad-hoc queries and viewing deployments in a terminal
3. Both methods allow you to filter deployments by flow name
The Python approach is more flexible as it allows you to:
- Process the deployment information programmatically
- Access all deployment attributes
- Integrate the functionality into your scripts or applications
Links to relevant documentation:
- Prefect API Client Documentation
- REST API - DeploymentsDaniel Leicht
07/22/2025, 10:36 AMMarvin
07/22/2025, 10:36 AMMarvin
07/22/2025, 10:37 AMpython
from prefect.client import get_client
from prefect.client.schemas.filters import DeploymentFilter, DeploymentFilterName
from prefect.deployments import run_deployment
async def run_flow_deployments(flow_name: str, parameters: dict = None):
async with get_client() as client:
# Get all deployments for the flow
deployment_filter = DeploymentFilter(
name=DeploymentFilterName(contains_=flow_name)
)
deployments = await client.read_deployments(
deployment_filter=deployment_filter
)
# Run each deployment
flow_runs = []
for deployment in deployments:
# The deployment name format is "flow-name/deployment-name"
deployment_name = f"{deployment.name}"
# Run the deployment
flow_run = await run_deployment(
name=deployment_name,
parameters=parameters,
timeout=3600, # Optional: adjust timeout as needed
client=client
)
flow_runs.append(flow_run)
return flow_runs
# Usage example:
import asyncio
# Optional: parameters to pass to the flow
parameters = {"param1": "value1", "param2": "value2"}
# Run all deployments for a flow
flow_runs = asyncio.run(run_flow_deployments(
flow_name="your-flow-name",
parameters=parameters
))
# Print results
for flow_run in flow_runs:
print(f"Flow run {flow_run.id} started for deployment {flow_run.deployment_id}")
print(f"Status: {flow_run.state.type}")
Key points about this approach:
1. The code gets all deployments for a flow and runs each one
2. You can optionally pass parameters to the flow runs
3. Each run_deployment
call returns a FlowRun
object that you can use to track the status
4. The deployment name format is typically "flow-name/deployment-name"
5. You can customize the run with additional parameters like:
- `timeout`: Maximum time to wait for the flow run to complete
- `work_queue_name`: Specify a particular work queue
- `tags`: Add tags to the flow run
- `scheduled_time`: Schedule the flow run for later
If you only want to get the deployments without running them, you can simply use the first part of the code:
python
async def get_flow_deployments(flow_name: str):
async with get_client() as client:
deployment_filter = DeploymentFilter(
name=DeploymentFilterName(contains_=flow_name)
)
return await client.read_deployments(
deployment_filter=deployment_filter
)
# Usage
deployments = asyncio.run(get_flow_deployments("your-flow-name"))
for deployment in deployments:
print(f"Found deployment: {deployment.name}")
Links to relevant documentation:
- Prefect Deployments Documentation
- API Reference - run_deploymentDaniel Leicht
07/22/2025, 11:04 AMMarvin
07/22/2025, 11:05 AMMarvin
07/22/2025, 11:06 AMpython
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import DeploymentFilter, DeploymentFilterName
def get_flow_deployments(flow_name: str):
# Get the synchronous client
client = get_client()
# Create a filter for deployments with the specified flow name
deployment_filter = DeploymentFilter(
name=DeploymentFilterName(contains_=flow_name)
)
# Read deployments using the filter
deployments = client.read_deployments(
deployment_filter=deployment_filter
)
return deployments
# Usage example - no async/await needed
deployments = get_flow_deployments("your-flow-name")
for deployment in deployments:
print(f"Deployment name: {deployment.name}")
print(f"Deployment ID: {deployment.id}")
print(f"Work queue: {deployment.work_queue_name}")
print("---")
And if you want to run the deployments synchronously:
python
from prefect.client.orchestration import get_client
from prefect.client.schemas.filters import DeploymentFilter, DeploymentFilterName
from prefect.deployments import run_deployment
def run_flow_deployments(flow_name: str, parameters: dict = None):
client = get_client()
# Get all deployments for the flow
deployment_filter = DeploymentFilter(
name=DeploymentFilterName(contains_=flow_name)
)
deployments = client.read_deployments(
deployment_filter=deployment_filter
)
# Run each deployment
flow_runs = []
for deployment in deployments:
# The deployment name format is "flow-name/deployment-name"
deployment_name = f"{deployment.name}"
# Run the deployment
flow_run = run_deployment(
name=deployment_name,
parameters=parameters,
timeout=3600, # Optional: adjust timeout as needed
client=client
)
flow_runs.append(flow_run)
return flow_runs
# Usage example
parameters = {"param1": "value1", "param2": "value2"}
flow_runs = run_flow_deployments("your-flow-name", parameters)
for flow_run in flow_runs:
print(f"Flow run {flow_run.id} started for deployment {flow_run.deployment_id}")
print(f"Status: {flow_run.state.type}")
Key differences from the async version:
1. No need for `async`/`await` keywords
2. No need for asyncio.run()
3. No need for async context manager (async with
)
4. The client is obtained using get_client()
without any context manager
5. Methods are called directly without await
The synchronous version is simpler to use if you're working in a synchronous context or don't need async functionality. However, if you're working with other async code or need to handle many concurrent operations, the async version might be more appropriate.
Links to relevant documentation:
- Prefect API Client Documentation
- REST API - DeploymentsDaniel Leicht
07/22/2025, 11:11 AMMarvin
07/22/2025, 11:11 AMMarvin
07/22/2025, 11:12 AMpython
from prefect.client import get_client
from prefect.utilities.asyncutils import sync_compatible
# Method 1: Using sync_compatible decorator
@sync_compatible
async def get_deployment(flow_filter):
async with get_client() as client:
deployments = await client.read_deployments(flow_filter=flow_filter)
return deployments[0] if deployments else None
# Usage:
prefect_flow_deployment = get_deployment(flow_filter)
# Method 2: Using run_sync_in_worker_thread
from prefect.utilities.asyncutils import run_sync_in_worker_thread
async def _get_deployment(flow_filter):
async with get_client() as client:
deployments = await client.read_deployments(flow_filter=flow_filter)
return deployments[0] if deployments else None
# Usage:
prefect_flow_deployment = run_sync_in_worker_thread(_get_deployment)(flow_filter)
Or, if you want a more direct approach that handles everything in one place:
python
from prefect.client import get_client
from prefect.utilities.asyncutils import run_sync_in_worker_thread
def get_deployment(flow_filter):
async def _get():
async with get_client() as client:
deployments = await client.read_deployments(flow_filter=flow_filter)
return deployments[0] if deployments else None
return run_sync_in_worker_thread(_get)()
# Usage:
prefect_flow_deployment = get_deployment(flow_filter)
The error you encountered happened because:
1. read_deployments()
returns a coroutine (async function) that needs to be awaited
2. Trying to index [0]
directly on a coroutine object isn't possible
Using either sync_compatible
or run_sync_in_worker_thread
properly handles the async-to-sync conversion, allowing you to work with the actual deployment results.
This is a common pattern in Prefect 3.x where many client operations are fundamentally async but can be used synchronously with these utilities. The sync_compatible
decorator is particularly useful as it makes the function usable in both sync and async contexts.
Links to relevant documentation:
- Prefect Async Utilities
- Prefect Client API Reference