Constantino Schillebeeckx
09/02/2021, 4:31 PMif not args.dry_run:
logger.log(SUCCESS, f"Registering flow {flow.name}")
flow.register(project_name=args.project, idempotency_key=flow.serialized_hash())
Everything is working as expected, except I'm getting more versions of the flow (after multiple registrations) than I was previously expecting. It makes me wonder what I don't understand about the idempotency_key
- could someone explain to me what could cause the serialized_hash to change unexpectedly? for example, if a different VM registers flows, would the hash change?Kevin Kho
datetime.datetime.now()
could change the serialized hash. Do you have something like this?Constantino Schillebeeckx
09/02/2021, 4:34 PMimport pendulum
from decouple import config
from prefect import Flow
from prefect.schedules import CronSchedule
from flows.utils.dbt import DbtTask
dbt = DbtTask(profiles_dir=config("DBT_PROFILES_DIR"), return_all=True, log_stderr=True)
with Flow("dbt_test") as flow:
# run at 08:00 in whatever is the local timezone of the script
flow.schedule = CronSchedule("0 8 * * *", start_date=pendulum.now())
dbt(command="dbt test")
if __name__ == "__main__":
flow.run() # so we can easily test
is one example, I'm guessing that start_date
is the guilty party?Kevin Kho
Constantino Schillebeeckx
09/02/2021, 4:35 PMKevin Kho
with
clause btw, though I think this will workConstantino Schillebeeckx
09/02/2021, 4:38 PMschedule = CronSchedule(...)
with FLow('name', schedule=schedule) as flow:
Kevin Kho
with Flow(..) as flow:
....
flow.schedule = ...
Constantino Schillebeeckx
09/02/2021, 4:39 PMKevin Kho
Constantino Schillebeeckx
09/02/2021, 4:40 PMKevin Kho