Andrew Pruchinski
04/03/2023, 6:50 PMrun_deployment
to kick off one of our deployed flows after an event is triggered within a lambda. I am currently getting a 401 error. I've already added the PREFECT__CLOUD__API_KEY
. Anything else I need to do? Any documentation would help too. Thank youMiguel Moncada
04/12/2023, 12:50 PMAuthorization: Bearer {PREFECT_API_KEY}
HTTP header.Andrew Pruchinski
04/12/2023, 1:24 PMMiguel Moncada
04/12/2023, 1:25 PMimport os
import requests
PREFECT_WORKSPACE_ID = os.environ.get("PREFECT_WORKSPACE_ID")
PREFECT_ACCOUNT_ID = os.environ.get("PREFECT_ACCOUNT_ID")
PREFECT_API_KEY = os.environ.get("PREFECT_API_KEY")
PREFECT_FLOW_NAME = os.environ.get("FLOW_NAME")
PREFECT_DEPLOYMENT_NAME = os.environ.get("DEPLOYMENT_NAME")
OK_STATUS_CODES = [200, 201]
def get_deployment_id(
flow_name: str,
deployment_name: str,
account_id: str = PREFECT_ACCOUNT_ID,
workspace_id: str = PREFECT_WORKSPACE_ID,
api_key: str = PREFECT_API_KEY,
) -> str:
"""Function to get deployment id from Prefect Cloud
Args:
flow_name (str): Flow name
deployment_name (str): Deployment name
account_id (str, optional): Account ID.
Defaults to PREFECT_ACCOUNT_ID.
workspace_id (str, optional): Workspace ID.
Defaults to PREFECT_WORKSPACE_ID.
api_key (str, optional): Prefect API key.
Defaults to PREFECT_API_KEY.
Raises:
Exception: If failed to get deployment id
Returns:
str: Deployment ID
"""
deployment_api_url = "<https://api.prefect.cloud/api>" \
f"/accounts/{account_id}" \
f"/workspaces/{workspace_id}" \
f"/deployments/name/{flow_name}/{deployment_name}"
headers = {"Authorization": f"Bearer {api_key}"}
r = requests.get(deployment_api_url, headers=headers)
if r.status_code in OK_STATUS_CODES:
deployment = r.json()
deployment_id = deployment.get('id')
else:
raise Exception(f"Failed to get deployment id for {flow_name}/"
f"{deployment_name}, status code is: {r.status_code}, "
f"error: {r.text}")
return deployment_id
def run_deployment(
deployment_id: str,
account_id: str = PREFECT_ACCOUNT_ID,
workspace_id: str = PREFECT_WORKSPACE_ID,
api_key: str = PREFECT_API_KEY
) -> dict:
deployment_run_api_url = "<https://api.prefect.cloud/api/>" \
f"accounts/{account_id}" \
f"/workspaces/{workspace_id}" \
f"/deployments/{deployment_id}/create_flow_run"
headers = {
"Authorization": f"Bearer {api_key}",
}
body = {
"state": {
"type": "SCHEDULED",
"message": "Run from Google Cloud Function",
"state_details": {}
},
"parameters": {
"env": "staging"
}
}
print(f"Running deployment {deployment_id} with body: {body}")
r = <http://requests.post|requests.post>(deployment_run_api_url, json=body, headers=headers)
if r.status_code in OK_STATUS_CODES:
flow_run = r.json()
return flow_run
else:
return {
"error": f"Failed to run deployment {deployment_id}",
"status_code": r.status_code,
"message": r.text
}
def main(flow_name: str, deployment_name: str) -> dict:
print(f"Running deployment {deployment_name} for flow {flow_name}")
print(f"Using account id: {PREFECT_ACCOUNT_ID}")
print(f"Using workspace id: {PREFECT_WORKSPACE_ID}")
deployment_id = get_deployment_id(flow_name, deployment_name)
flow_run = run_deployment(deployment_id)
print(flow_run)
return flow_run
if __name__ == '__main__':
main(
flow_name=PREFECT_FLOW_NAME,
deployment_name=PREFECT_DEPLOYMENT_NAME
)
Andrew Pruchinski
04/12/2023, 3:46 PMMiguel Moncada
04/12/2023, 3:46 PM