https://prefect.io logo
Title
a

Andrew Pruchinski

04/03/2023, 6:50 PM
Hey all. Hoping someone could point me in the right direction. Trying to use
run_deployment
to kick off one of our deployed flows after an event is triggered within a lambda. I am currently getting a 401 error. I've already added the
PREFECT__CLOUD__API_KEY
. Anything else I need to do? Any documentation would help too. Thank you
Even using the requests method results in a 401.
Created two different api keys as well and both of them are returning 401s
m

Miguel Moncada

04/12/2023, 12:50 PM
Hi @Andrew Pruchinski I think that for API call requests you can use the
Authorization: Bearer {PREFECT_API_KEY}
HTTP header.
a

Andrew Pruchinski

04/12/2023, 1:24 PM
Thank you for the response! I got it working
👌 1
m

Miguel Moncada

04/12/2023, 1:25 PM
awesome! for reference only, in case it's any helpful for anyone else if there's a requirement to have it running via API for any reason:
import os
import requests

PREFECT_WORKSPACE_ID = os.environ.get("PREFECT_WORKSPACE_ID")
PREFECT_ACCOUNT_ID = os.environ.get("PREFECT_ACCOUNT_ID")
PREFECT_API_KEY = os.environ.get("PREFECT_API_KEY")

PREFECT_FLOW_NAME = os.environ.get("FLOW_NAME")
PREFECT_DEPLOYMENT_NAME = os.environ.get("DEPLOYMENT_NAME")

OK_STATUS_CODES = [200, 201]


def get_deployment_id(
    flow_name: str,
    deployment_name: str,
    account_id: str = PREFECT_ACCOUNT_ID,
    workspace_id: str = PREFECT_WORKSPACE_ID,
    api_key: str = PREFECT_API_KEY,
) -> str:
    """Function to get deployment id from Prefect Cloud

    Args:
        flow_name (str): Flow name
        deployment_name (str): Deployment name
        account_id (str, optional): Account ID.
                                    Defaults to PREFECT_ACCOUNT_ID.
        workspace_id (str, optional): Workspace ID.
                                      Defaults to PREFECT_WORKSPACE_ID.
        api_key (str, optional): Prefect API key.
                                 Defaults to PREFECT_API_KEY.

    Raises:
        Exception: If failed to get deployment id

    Returns:
        str: Deployment ID
    """
    deployment_api_url = "<https://api.prefect.cloud/api>" \
                         f"/accounts/{account_id}" \
                         f"/workspaces/{workspace_id}" \
                         f"/deployments/name/{flow_name}/{deployment_name}"
    headers = {"Authorization": f"Bearer {api_key}"}

    r = requests.get(deployment_api_url, headers=headers)

    if r.status_code in OK_STATUS_CODES:
        deployment = r.json()
        deployment_id = deployment.get('id')
    else:
        raise Exception(f"Failed to get deployment id for {flow_name}/"
                        f"{deployment_name}, status code is: {r.status_code}, "
                        f"error: {r.text}")
    return deployment_id


def run_deployment(
    deployment_id: str,
    account_id: str = PREFECT_ACCOUNT_ID,
    workspace_id: str = PREFECT_WORKSPACE_ID,
    api_key: str = PREFECT_API_KEY
) -> dict:
    deployment_run_api_url = "<https://api.prefect.cloud/api/>" \
                             f"accounts/{account_id}" \
                             f"/workspaces/{workspace_id}" \
                             f"/deployments/{deployment_id}/create_flow_run"

    headers = {
        "Authorization": f"Bearer {api_key}",
        }
    body = {
        "state": {
            "type": "SCHEDULED",
            "message": "Run from Google Cloud Function",
            "state_details": {}
        },
        "parameters": {
            "env": "staging"
            }
        }

    print(f"Running deployment {deployment_id} with body: {body}")

    r = <http://requests.post|requests.post>(deployment_run_api_url, json=body, headers=headers)

    if r.status_code in OK_STATUS_CODES:
        flow_run = r.json()
        return flow_run
    else:
        return {
            "error": f"Failed to run deployment {deployment_id}",
            "status_code": r.status_code,
            "message": r.text
            }


def main(flow_name: str, deployment_name: str) -> dict:
    print(f"Running deployment {deployment_name} for flow {flow_name}")
    print(f"Using account id: {PREFECT_ACCOUNT_ID}")
    print(f"Using workspace id: {PREFECT_WORKSPACE_ID}")

    deployment_id = get_deployment_id(flow_name, deployment_name)
    flow_run = run_deployment(deployment_id)
    print(flow_run)
    return flow_run


if __name__ == '__main__':
    main(
        flow_name=PREFECT_FLOW_NAME,
        deployment_name=PREFECT_DEPLOYMENT_NAME
    )
a

Andrew Pruchinski

04/12/2023, 3:46 PM
thank you so much for that @Miguel Moncada!
🤗 1
m

Miguel Moncada

04/12/2023, 3:46 PM
you're welcome!