Hi everyone! Just wanted to know if there is a way...
# ask-community
s
Hi everyone! Just wanted to know if there is a way to trigger deployments using raw API requests instead of using the run_deployment from prefect. The problem is that I have an application which depends on a pydantic version which the prefect-client is incompatible with, so looking if I can make an API call to the Prefect Server and run the deployment instead of using the utility from the prefect-client. Any help or guidance is appreciated. Thanks
a
👋 ! Which pydantic version are you on?
(the answer is yes btw -- will try to dig up the exact url but just curious about your version)
s
My current application need pydantic<=2.4 whereas prefect needs >=2.7
Actually I went through the rest api docs and found a way out for this
a
oh fascinating what broke in >2.4?
s
Basically SQLModel was breaking after 2.4
a
We're pinning to 2.7 just to get access to it's Secret class without having to implement our own, but it's not worth it if we're going to brick you from the client
oh fascinating. honestly I'm happy just backporting their Secret class so we can relax the bin
pin*
do you know if there's an issue on SQLModel where this is getting tracked?
s
Im not sure of the issue but I was getting an error (lost the snippet of it sorry). This happened because I brute forced the install of pydantic version prefect needed and SQLModel was failing because of that
a
no worries, i'll make an issue for prefect to track this
👍 1
s
Copy code
def create_flow_run_by_deployment(
    deployment_id: str,
    name: str,
    parameters: dict,
    state: Optional[Dict] = DEFAULT_STATE,
    work_queue_name: Optional[Dict] = DEFAULT_WORK_QUEUE_NAME,
    empirical_policy: Optional[Dict] = DEFAULT_EMPIRICAL_POLICY,
):
    payload = {
        "name": name,
        "parameters": parameters,
        "state": state,
        "work_queue_name": work_queue_name,
        "empirical_policy": empirical_policy,
    }

    
    url = os.path.join(os.getenv("PREFECT_API_URL"), f"api/deployments/{deployment_id}/create_flow_run")

    response = <http://requests.post|requests.post>(url, json=payload)
    
    return response.json()
This is the function that I wrote to trigger a deployment . Putting it here so that if anyone is looking for a similar solution can use this.
a
🙇
Thanks Sam!
🙌 1
o
I wrote a similar function a couple of years back that we use in our project, it does the same thing as yours but with a few improvements, error handling etc in case you want to poll the created run or that sort of thing. Not sure if it's of any use of you but here it is anyway!
Copy code
async def _get_deployment_id(client: PrefectClient, flow_name: str, env: Env) -> UUID:
    deployments = await client.read_deployments()

    for deployment in deployments:
        if deployment.name == f"{flow_name} ({env.value})":
            return deployment.id

    raise LookupError(f"No deployment found for {flow_name} ({env.value})!")

@task(task_run_name="Flow {flow_name}")
async def run_deployment(flow_name: str, env: Env, run_name: str, parameters: Dict[str, Any] | None = None, fire_and_forget: bool = False, blocking: bool = True) -> State | None:
    """Creates a flow run from a deployment and polls it until it reaches a terminal state.

    If fire and forget is true, no polling is performed and the function exits early.
    """
    logger = get_run_logger()

    async with get_client() as client:
        deployment_id = await _get_deployment_id(client=client, flow_name=flow_name, env=env)

        flow_run = await client.create_flow_run_from_deployment(deployment_id=deployment_id, name=run_name, parameters=parameters)
        flow_run_id = flow_run.id

        <http://logger.info|logger.info>(f"Created flow run for flow {flow_name} with name: {flow_run.name} and id: {flow_run.id}")

        retry_counter_prefect_api = 0
        retry_counter_infra = 0

        # If fire_and_forget is true, don't poll, just return.
        while not fire_and_forget:
            try:
                flow_run = await client.read_flow_run(flow_run_id=flow_run_id)
            except ReadTimeout:
                if retry_counter_prefect_api >= 5:
                    raise

                retry_counter_prefect_api = retry_counter_prefect_api + 1
                logger.warning(f"Timed out while reading flow run {flow_name}, retrying in 5 mins. This is retry attempt {retry_counter_prefect_api} / 5")
                await asyncio.sleep(60 * 5)
                continue

            if flow_run.state_type in TERMINAL_STATES:
                if flow_run.state_type == StateType.COMPLETED:
                    <http://logger.info|logger.info>(f"Child flow run {flow_name} / {flow_run.name} completed successfully.")
                elif flow_run.state_type == StateType.CRASHED:
                    if retry_counter_infra >= 0:  # Was originally set to 3. Disabled by setting it to 0 on 2024-05-12. We can't use it with ACI.
                        logger.error(f"Child flow run {flow_name} / {flow_run.name} crashed due to infrastructure issues.")

                        if blocking:
                            return Failed()

                        return None

                    retry_counter_infra = retry_counter_infra + 1
                    logger.warning(
                        f"Child flow run {flow_name} / {flow_run.name} crashed due to infrastructure failure, retrying in 5 mins. This is retry attempt {retry_counter_infra} / 3"
                    )

                    # Wait a little bit, in case there's a temporary outage in Azure.
                    await asyncio.sleep(60 * 5)

                    # Create a new flow run, the previous one crashed.
                    flow_run = await client.create_flow_run_from_deployment(deployment_id=deployment_id, name=run_name, parameters=parameters)
                    flow_run_id = flow_run.id

                    <http://logger.info|logger.info>(f"The retried flow run for flow {flow_name} with name: {flow_run.name} has a new id: {flow_run.id}")

                    # Reset retry_counter_prefect_api since we've started a whole new flow run.
                    retry_counter_prefect_api = 0

                    continue
                elif flow_run.state_type in (StateType.CANCELLED, StateType.FAILED):
                    logger.error(f"Child flow run {flow_name} / {flow_run.name} exited non-successfully.")

                    if blocking:
                        return Failed()
                else:
                    logger.critical(f"Encountered an unknown terminal state in child flow run {flow_name}: {flow_run.state_type}")

                    if blocking:
                        return Failed()

                return None

            <http://logger.info|logger.info>(f"Waiting for child flow run {flow_name} / {flow_run.name} with state {flow_run.state_type}")

            await asyncio.sleep(30)

    return None
🔥 2
I know Prefect added something similar a while back but it had some issues, like it wouldn't let you run deployments in separate infra (maybe that got fixed?), the dag it created was sub-optimal etc (think there was a new subflow created per poll).. I don't recall exactly, I should probably try it out again. 😅
😂 1
a
If you kick the tires and still feel like your home-rolled implementation is better, HMU so I can steal your implementation and put it in prefect 😂
o
If you think the snippet contains some features that are useful for other people, feel free to grab it and do whatever you want with it! I've shared it a couple of times before, in fact I think the run_deployment name in Prefect is based on the name of this function when I first shared it with Anna Gellar back in the day, before there was an official version 😄 It's been so long I don't remember exactly what my issue with the official implementation was, but these are our requirements, and at least some of them weren't supported: 1. Nice dag, don't want a subflow per poll 2. Needs to be able to run multiple deployments concurrently (I think this one didn't work?) 3. Needs to run each deployment in separate infra, at least as an option
💅 1
a
🙇 tack tack, will take a look!
👍 1