Patrick Tan
08/10/2022, 5:54 PMOscar Björhn
08/10/2022, 5:59 PMasync def default_test():
async with get_client() as client:
deployments = await client.read_deployments()
for deployment in deployments:
print(deployment.name)
print(deployment.id)
Oscar Björhn
08/10/2022, 6:00 PMMason Menges
08/10/2022, 6:02 PMfrom prefect.orion.schemas.filters import DeploymentFilterName
filter_name = DeploymentFilterName(["deploymentName"])
async def default_test():
async with get_client() as client:
deployments = await client.read_deployments(deployment_filter=filter_name)
Patrick Tan
08/10/2022, 7:18 PMasync def get_deployment_id(flow_name, deployment_name):
async with get_client() as client:
flows = await client.read_flows()
for flow in flows:
if flow.name == flow_name:
flow_id = flow.id
deployments = await client.read_deployments()
for deployment in deployments:
if deployment.flow_id == flow_id:
if deployment.name == deployment_name:
return deployment.id