https://prefect.io logo
j

Jake

03/03/2022, 2:43 PM
We have a parameter that gets passed to some of our tasks (like which DB endpoint to point to); when this value changes, re-registration won’t happen (since it doesn’t count as any of the metadata) but how can I make it so that this change does trigger a re-registration?
a

Anna Geller

03/03/2022, 2:45 PM
Can you share more about this YAML file process? not sure how this works
I'd be curious to see how you pass the parameter values for the flow run - are you attaching parameters to your clock and schedule?
j

Jake

03/03/2022, 2:48 PM
we have a function:
Copy code
def get_flow(general_configs):
     with Flow(...) as flow:
        some_task(general_configs)
     return flow
where general_config is an object that is generate from the yml
We call this get_flow function when trying to register
a

Anna Geller

03/03/2022, 2:55 PM
and the YAML file? 😄 so far, this function doesn't tell me anything about how parameters are passed to this so that we could force registration upon change to those parameter values and perhaps do you mean something else when you say "parameters"? When I hear Parameter, I'm thinking of Prefect parameter task
We call this get_flow function when trying to register
Can you share how you do it? This would be important to see before we can make any recommendations about the registration process
j

Jake

03/03/2022, 3:03 PM
The yaml file is not really relevant; it is read before we start building the flow; the flow’s tasks only receives values read from the file. When I say parameter I mean like:
Copy code
@task
def add(x, y=1):
    return x + y
so x, and y would be parameters (sorry if this is the wrong term to be using).
👍 1
a

Anna Geller

03/03/2022, 3:04 PM
and really if you do:
Copy code
flow = get_flow("/path/to/config.yaml")
flow.register("project_name")
then by default this will trigger a flow registration and will bump up the flow version - always, any time you run it, regardless if something in the yaml changed or not
j

Jake

03/03/2022, 3:04 PM
In the function that is using get_flows, we then do:
Copy code
flow.register(
            project_name=prefect_config["project_name"],
            labels=prefect_config.get("labels"),
            idempotency_key=flow.serialized_hash(),
        )
a

Anna Geller

03/03/2022, 3:06 PM
throw out this argument and you're done 🙂
Copy code
idempotency_key=flow.serialized_hash(),
Doing this instead will always register the flow and bump up the flow version:
Copy code
flow.register(
            project_name=prefect_config["project_name"],
            labels=prefect_config.get("labels"),
        )
j

Jake

03/03/2022, 3:06 PM
ideally I do not want it to re-register every time (and bump the version). Is this possible?
a

Anna Geller

03/03/2022, 3:13 PM
ok, sorry I now get it. You need to compute the hash of your config dictionary instead of your flow and pass it as
idempotency_key
. This way if anything in your config has changed since the last registration, this will increment the flow version
is the
general_configs
just a path or dictionary? if it's a dict, you could do:
Copy code
import json
import hashlib


flow_idempotency_key = hashlib.sha256(
            json.dumps(general_configs, sort_keys=True).encode()
        ).hexdigest()


flow.register(            project_name=prefect_config["project_name"],
            labels=prefect_config.get("labels"),
            idempotency_key=flow_idempotency_key,
        )
j

Jake

03/03/2022, 3:16 PM
that could work!! basically make that dict into part of the hash right?
a

Anna Geller

03/03/2022, 3:18 PM
correct! instead of using the hash of your serialized flow as an idempotency key, you are using the hash of your config dict. And you can build the idempotency key as you wish - you could even combine it with other flow metadata you care about and build a single dictionary of all arguments that you consider relevant as part of a flow version btw in Orion, versioning will get even easier since you will be able to provide a custom flow version as you wish, e.g.
Copy code
@flow(name="versioned_flow", version=os.getenv("GIT_COMMIT_SHA"))
j

Jake

03/11/2022, 9:12 PM
when doing
flow.register
without changes; should the message output from prefect be different? How do I know if it was actually re-registered or not (without looking at if the version changed).
k

Kevin Kho

03/11/2022, 9:30 PM
Ah i see what you mean. There is only an indicator when you use the CLI but none for
flow.register()
. I think the only way to get this at the moment is to get the id returned by flow.register() and then use the graphql API to examine it
j

Jake

03/11/2022, 9:34 PM
calling
flow.register()
on a flow without changes takes about the same amount of time as registering a new flow entirely. Is this expected? I checked that computing the hash takes less than 1/10th of a second but it takes much longer (a couple seconds) for the call to complete.
One solution I was thinking is storing the hash locally to prevent an extra call to prefect since that seems to be the bottleneck
k

Kevin Kho

03/11/2022, 9:35 PM
Yes it’s expected because both are just API calls so that’s the time it takes to receive the response
j

Jake

03/11/2022, 9:37 PM
For reference we have ~800 flows and making the register call on all of them takes over 40 mins.
k

Kevin Kho

03/11/2022, 9:39 PM
Is it something like you are registering the same Flow with different parameters? Wondering why you need to “submit” all for re-registration all the time?
j

Jake

03/11/2022, 9:42 PM
This is part of a CICD pipeline; all the flows right now are submitted for registration since the pipeline does not know if a single flow has changed or not. Is there a better way to only register flows that have changed?
k

Kevin Kho

03/11/2022, 9:43 PM
The CLI does the idempotency key check let me check if the mechanism there is an API call
j

Jake

03/11/2022, 9:44 PM
We are currently using the idempotency key check but the speed is similar whether there was a change or not
k

Kevin Kho

03/11/2022, 9:47 PM
Ah ok that makes sense because I assume it’s an API call as well. I honestly don’t know if it will be faster but the flow in the database has a
serialized_flow
that you could pull with:
Copy code
query {
  flow(where: {id: {_eq: "66a3d613-aa6b-482b-8b2a-6d0e237dad44"}}){
    name
    created
    serialized_flow
  }
}
So you can maybe use graphql to pull all the flows ahead of time. and then search if the hash is there? Seems too convoluted and I am unsure it will help. Yes also though if you could just store the hash locally like you suggest, then maybe the
flow.register()
can be called based on your local check?
j

Jake

03/11/2022, 9:48 PM
maybe I can make a single api call and get all the existing hashes.. it seems the overhead is just making the call so a single call is not bad.
And then I can check if that hash already exists skip the flow.register call
this way i do not need to persist a local list of hashes
Is this possible?
I don’t think I can just used the serialized_flow since our hash contains other metadata
Can I get a list of existing idempotency keys?
k

Kevin Kho

03/11/2022, 9:52 PM
I dont see it exposed on the flow schema so it would need to go as a feature request to make that query-able
j

Jake

03/11/2022, 9:53 PM
How can I submit a feature request?
k

Kevin Kho

03/11/2022, 9:55 PM
You can open one in the issues tab of the repo. I think that would solve the issue here so we’ll see what the core devs say. Just note that I think the API returns 100 items so for 800 you have to paginate and it will be 8 queries. Still, it will be a one-time cost and reduced by a factor of 100
I already encourage a quick write up of the use case just in case they have a better solution than me here
Sorry about that!
j

Jake

03/11/2022, 10:06 PM
a

Anna Geller

03/12/2022, 9:52 AM
This is part of a CICD pipeline; all the flows right now are submitted for registration since the pipeline does not know if a single flow has changed or not. Is there a better way to only register flows that have changed?
@Jake What are you using for CI/CD? Most CI/CD solutions actually have hooks to determine what files changed. Many companies like Google have a monorepo-strategy so performing some action only for changed/added files is extremely common and most CI/CD tools have a solution or some API hooks for it. I'm not saying it's super easy to implement, but this is definitely doable - e.g. Github actions have extensions like this one allowing to do:
Copy code
- id: files
  uses: jitterbit/get-changed-files@v1
- run: |
    for changed_file in ${{ steps.files.outputs.all }}; do
      python registration_script.py --flow_file ${changed_file}
    done
So to implement your custom registration logic, instead of looping over flows within your custom registration script, you could shift this looping to the CI/CD script. Your script could accept the file name as argument that would get passed from the CI/CD. This would actually make your registration process easier/more reliable as I can imagine sometimes you may need to manually trigger reregistration for a single flow when something unexpected happens.
I also answered on Github - it would be easier to continue the discussion there to decide whether the issue should be kept or whether you'll give the CI/CD looping a try
j

Jake

03/14/2022, 1:51 PM
Thank you so much for the detailed writeup! We are using CricleCI, but I am not sure if there is a clean way we can take advantage of this since we do not have “flow files” — our flows are programmatically built across many files. It might be more difficult to keep a list of files that when changed make a flow re-register. I am currently building an api trigger that will allow us to specify which specific flows to re-register. (also answered on github)
👍 1
Update: using the idempotency key check works flawlessly; our deployments are actually about 10x faster 🤩
k

Kevin Kho

03/17/2022, 2:28 PM
Dang you were able to code it up? That’s awesome man!
j

Jake

03/17/2022, 2:33 PM
Copy code
def get_flow_idempotency_keys():

    keys = set()
    page = 0
    limit = 100
    while True:
        qry = {
            "query": {
                f"flow_group(limit: {limit}, offset: {page * limit})": [
                    "settings",
                ]
            }
        }
        response = prefect.Client().graphql(qry)
        flow_groups = response["data"].flow_group

        for flow_group in flow_groups:
            keys.add(flow_group.settings.idempotency_key)

        if len(response["data"].flow_group) < limit:
            break
        page += 1

    return keys
and then:
Copy code
if idempotency_key in prev_idempotentency_keys:
            print(f"[{flow.name}] already registered with key: {idempotency_key}")
            continue
in case anyone else has a similar usecase 🙂
k

Kevin Kho

03/17/2022, 2:37 PM
That’s actually a loy more elegant than i thoguth