Jake
03/03/2022, 2:43 PMAnna Geller
Jake
03/03/2022, 2:48 PMdef get_flow(general_configs):
with Flow(...) as flow:
some_task(general_configs)
return flow
where general_config is an object that is generate from the ymlAnna Geller
We call this get_flow function when trying to registerCan you share how you do it? This would be important to see before we can make any recommendations about the registration process
Jake
03/03/2022, 3:03 PM@task
def add(x, y=1):
return x + y
so x, and y would be parameters (sorry if this is the wrong term to be using).Anna Geller
flow = get_flow("/path/to/config.yaml")
flow.register("project_name")
then by default this will trigger a flow registration and will bump up the flow version - always, any time you run it, regardless if something in the yaml changed or notJake
03/03/2022, 3:04 PMflow.register(
project_name=prefect_config["project_name"],
labels=prefect_config.get("labels"),
idempotency_key=flow.serialized_hash(),
)
Anna Geller
idempotency_key=flow.serialized_hash(),
Doing this instead will always register the flow and bump up the flow version:
flow.register(
project_name=prefect_config["project_name"],
labels=prefect_config.get("labels"),
)
Jake
03/03/2022, 3:06 PMAnna Geller
idempotency_key
. This way if anything in your config has changed since the last registration, this will increment the flow versiongeneral_configs
just a path or dictionary? if it's a dict, you could do:
import json
import hashlib
flow_idempotency_key = hashlib.sha256(
json.dumps(general_configs, sort_keys=True).encode()
).hexdigest()
flow.register( project_name=prefect_config["project_name"],
labels=prefect_config.get("labels"),
idempotency_key=flow_idempotency_key,
)
Jake
03/03/2022, 3:16 PMAnna Geller
@flow(name="versioned_flow", version=os.getenv("GIT_COMMIT_SHA"))
Jake
03/11/2022, 9:12 PMflow.register
without changes; should the message output from prefect be different? How do I know if it was actually re-registered or not (without looking at if the version changed).Kevin Kho
flow.register()
. I think the only way to get this at the moment is to get the id returned by flow.register() and then use the graphql API to examine itJake
03/11/2022, 9:34 PMflow.register()
on a flow without changes takes about the same amount of time as registering a new flow entirely. Is this expected? I checked that computing the hash takes less than 1/10th of a second but it takes much longer (a couple seconds) for the call to complete.Kevin Kho
Jake
03/11/2022, 9:37 PMKevin Kho
Jake
03/11/2022, 9:42 PMKevin Kho
Jake
03/11/2022, 9:44 PMKevin Kho
serialized_flow
that you could pull with:
query {
flow(where: {id: {_eq: "66a3d613-aa6b-482b-8b2a-6d0e237dad44"}}){
name
created
serialized_flow
}
}
So you can maybe use graphql to pull all the flows ahead of time. and then search if the hash is there? Seems too convoluted and I am unsure it will help. Yes also though if you could just store the hash locally like you suggest, then maybe the flow.register()
can be called based on your local check?Jake
03/11/2022, 9:48 PMKevin Kho
Jake
03/11/2022, 9:53 PMKevin Kho
Jake
03/11/2022, 10:06 PMAnna Geller
This is part of a CICD pipeline; all the flows right now are submitted for registration since the pipeline does not know if a single flow has changed or not. Is there a better way to only register flows that have changed?@Jake What are you using for CI/CD? Most CI/CD solutions actually have hooks to determine what files changed. Many companies like Google have a monorepo-strategy so performing some action only for changed/added files is extremely common and most CI/CD tools have a solution or some API hooks for it. I'm not saying it's super easy to implement, but this is definitely doable - e.g. Github actions have extensions like this one allowing to do:
- id: files
uses: jitterbit/get-changed-files@v1
- run: |
for changed_file in ${{ steps.files.outputs.all }}; do
python registration_script.py --flow_file ${changed_file}
done
So to implement your custom registration logic, instead of looping over flows within your custom registration script, you could shift this looping to the CI/CD script. Your script could accept the file name as argument that would get passed from the CI/CD. This would actually make your registration process easier/more reliable as I can imagine sometimes you may need to manually trigger reregistration for a single flow when something unexpected happens.Jake
03/14/2022, 1:51 PMKevin Kho
Jake
03/17/2022, 2:33 PMdef get_flow_idempotency_keys():
keys = set()
page = 0
limit = 100
while True:
qry = {
"query": {
f"flow_group(limit: {limit}, offset: {page * limit})": [
"settings",
]
}
}
response = prefect.Client().graphql(qry)
flow_groups = response["data"].flow_group
for flow_group in flow_groups:
keys.add(flow_group.settings.idempotency_key)
if len(response["data"].flow_group) < limit:
break
page += 1
return keys
if idempotency_key in prev_idempotentency_keys:
print(f"[{flow.name}] already registered with key: {idempotency_key}")
continue
Kevin Kho