https://prefect.io logo
s

skrawczyk

07/07/2023, 5:52 PM
Hey all, I'm looking for a way to dynamically turn off the schedules of old flow deployments using the Python API. For example, lets say I have a flow called
hello_world
with a deployment called
1.0
that is scheduled to run everyday. I already have some code in place that can automatically create a new version of
hello_world
as a deployment named
1.1
that would also run everyday. I want to retroactively go back and turn off the schedule for
hello_world:1.0
so there aren't 2 versions running at the same time. I see that the Deployment class has a
load()
and
update()
function, which would take care of turning off the schedule if I knew which deployment name I needed to turn off. Does anyone know how I can list all deployments a flow has stored and select the most recent or the ones that have active schedules?
1
The top REST API option of
Read Deployment By Name
won't work because it requires both
flow_name
and
deployment_name
. I'm only able to get the flow_name,
deployment_name
is the tricky part that I'm trying to work around as there's no way to tell what it would be in our use case. That's why I'm looking for some sort of function that you could provide
flow_name
to and it gives back all of the current
deployment_name
values
Set Schedule Inactive
probably won't work either as I assume the required
deployment_id
is unique to a flow_name/deployment_name combo, which takes us back to the same issue
I know this was possible in the prefect 1.0 days on the GQL api, not sure if that metadata is still available however
c

Christopher Boyd

07/07/2023, 6:38 PM
There’s a filter for deployments, but I’d need to check what is returned in the body of the request
image.png
s

skrawczyk

07/07/2023, 6:40 PM
Yep! I want to supply the flow name and get all deployment_names under that flow_name
c

Christopher Boyd

07/07/2023, 6:40 PM
Yea, there’s no direct route to do that
I’m checking to see what query the UI does for a flow
👍 1
It’s doing a search on deployments with a flow filter
image.png
Here’s the full curl posting that the UI is doing for this action (flow -> deployments associated):
Copy code
curl '<https://api.prefect.cloud/api/accounts//workspaces//deployments/filter>' \
  -H 'authority: api.prefect.cloud' \
  -H 'accept: application/json, text/plain, */*' \
  -H 'accept-language: en-US,en;q=0.9' \
  -H 'authorization: bearer ' \
  -H 'content-type: application/json' \
  -H 'origin: <https://app.prefect.cloud>' \
  -H 'sec-ch-ua: "Not.A/Brand";v="8", "Chromium";v="114", "Google Chrome";v="114"' \
  -H 'sec-ch-ua-mobile: ?0' \
  -H 'sec-ch-ua-platform: "macOS"' \
  -H 'sec-fetch-dest: empty' \
  -H 'sec-fetch-mode: cors' \
  -H 'sec-fetch-site: same-site' \
  -H 'user-agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36' \
  -H 'x-prefect-ui: true' \
  --data-raw '{"flows":{"id":{"any_":["5f35f85c-a5d1-471b-8da3-b7b22dd29978"]}},"sort":"NAME_ASC"}' \
  --compressed
s

skrawczyk

07/07/2023, 6:49 PM
cool, I can make that work for now. Is there an easy way to retrieve the flow_id using the flow name to pass into this curl?
c

Christopher Boyd

07/07/2023, 6:50 PM
Depends - it’s listed in the UI if I select that flow, otherwise programmatically you’d probably want to return the id with a name filter, or through cli with
prefect flow ls
plenty of ways, just depends on which way you need
Copy code
<https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}/flows/name/{name}>
s

skrawczyk

07/07/2023, 6:58 PM
perfect, thanks Chris!
should be able to make it work this way with a little digging
🙌 1
catjam 1
n

Nico Neumann

07/07/2023, 7:18 PM
What if you use a static name and instead supply the version into the Deployment as parameter? https://docs.prefect.io/2.10.18/api-ref/prefect/deployments/deployments/ And then just overwrite the deployment by applying a new version?
s

skrawczyk

07/07/2023, 7:28 PM
@Christopher Boyd do you have any docs on how to supply a
FlowFilter
object to
prefect.client.read_deployments()
? tried dicts and strings, both threw errors. https://docs.prefect.io/2.10.15/api-ref/prefect/client/orchestration/#prefect.client.orchestration.PrefectClient.read_deployments
c

Christopher Boyd

07/07/2023, 7:29 PM
I have an example I wrote, but I don’t know that we have anything doc’d
Copy code
from prefect.client.schemas.filters import DeploymentFilter, DeploymentFilterName, FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType
import asyncio

async def get_client_response() -> asyncio.coroutine:
    async with get_client() as client:
        response = await client.read_flow_runs(
            deployment_filter=DeploymentFilter(name=DeploymentFilterName
            (like_="default")),
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    type=FlowRunFilterStateType(
                        any_=["FAILED", "COMPLETED"])),
        )
    )
    print(response)
    print(len(response))

asyncio.run(get_client_response())
🙌 1
gratitude thank you 1
s

skrawczyk

07/07/2023, 7:30 PM
@Nico Neumann not sure what you mean, but having a static name in the deployment and then replacing it with a version parameter is what I'm doing to make a new flow deployment. My problem comes from finding old deployments and automatically turning off their schedule in favor of the new deployment taking over
c

Christopher Boyd

07/07/2023, 7:30 PM
I’d have to look and spend a little bit of time with the filters to get a better answer
sorry 😞
s

skrawczyk

07/07/2023, 7:31 PM
no worries, that atleast points me in the right direction
🙌 1
got it done pretty pain-free, thanks again!
Copy code
async def turn_off_old_schedules(flow_name:str):
    async with get_client() as client:

        # get flow_id using the name
        flow_result:flow_api = await client.read_flow_by_name(flow_name)
        flow_id = flow_result.id

        # get any deployment where the schedule is active
        active_flow_deploys = await client.read_deployments(
            flow_filter=FlowFilter(id=FlowFilterId(any_=[flow_id])),
            deployment_filter=DeploymentFilter(is_schedule_active=DeploymentFilterIsScheduleActive(eq_=True)))
        
        print(f"\n Number of active deploys: {len(active_flow_deploys)}\n")

        # checking if there are deploys to turn off
        if bool(active_flow_deploys):
            print(f"""\nTurning off old active schedules for the "{flow_name}" flow\n\n""")
            for deploy in active_flow_deploys:
                flow = Deployment(flow_name=flow_name, name=deploy.name)
                await flow.load()
                flow.is_schedule_active = False
                await flow.apply()
🙌 4
1
c

Christopher Boyd

07/07/2023, 8:26 PM
nicely done!