Michael Hadorn
10/21/2021, 7:07 AMMichael Hadorn
10/21/2021, 7:09 AMimport time
from datetime import datetime
import prefect
from prefect import Flow, task, Parameter, unmapped
from prefect.storage import S3, Docker
from prefect.run_configs import DockerRun
@task
def say(my_time, my_time2):
logger = prefect.context['logger']
<http://logger.info|logger.info>(f"my_time: {my_time} | my_time2: {my_time2}")
date_format = '%Y-%m-%d %H:%M:%S.%f'
now = datetime.now()
my_time = now.strftime(date_format)
time.sleep(1)
storage = S3(
bucket=""
, client_options=dict()
)
with Flow(
"Test the Storage"
, storage=storage
, run_config=DockerRun(
image=""
),
) as flow:
now2 = datetime.now()
my_time2 = now2.strftime(date_format)
t1 = say(my_time, my_time2)
if __name__ == '__main__':
# flow.run()
hash = flow.serialized_hash()
print(hash)
flow.register(
project_name="cdwh prefect-development"
, labels=['prefect-development']
# , idempotency_key=hash
, set_schedule_active=False
)
So both times (outside context and inside) are frozen while the register.
Now If i register the flow again with idempotency_key=flow.serialized_hash()
there is no change, even the time changed.Michael Hadorn
10/21/2021, 7:17 AMemre
10/21/2021, 8:38 AMTLDR: Pickled flow stores your constant datetimes, but serialized hash does not use constants to calculate hash value.First, try visualizing this flow. There is no task for your constant datetimes. Because they are calculated via python at that specific line, and passed to your
say
task as constant values. Inside or outside flow context doesn't matter for those.
In that case, where are your constant datetime values stored?
Turns out flow instances have a dictionary field called constants, where the flow stores which tasks should receive which constants. This field is obviously pickled in the file we upload to s3. However, the field is not included in hash calculation. The included fields I believe are these:
https://github.com/PrefectHQ/prefect/blob/efa558f4117ca2c801325a745999b64671fde4e1/src/prefect/serialization/flow.py#L82emre
10/21/2021, 8:40 AMAnna Geller
@task
def get_current_time():
date_format = '%Y-%m-%d %H:%M:%S.%f'
now = datetime.now()
return now.strftime(date_format)
with Flow("Test the Storage") as flow:
my_time = get_current_time()
my_time_2 = get_current_time()
t1 = say(my_time, my_time2)
Michael Hadorn
10/21/2021, 9:55 AMfrom prefect.utilities.storage import extract_flow_from_file
where the objects are loaded as a constant, but the call of this is in a task).
This register flow.register() was set with the use of a idempotency_key
to only update it if something changed.
Now: If there is an config-change, this will be reflected in the object collection (as a constant what i learned). And this will not be reflected in a different hash.
So my workaround is to remove the idempotency_key
. This is fine, but i generate everyday a new version - and this is not optimal.
In this (maybe edge) case, it would be very nice to use also the constants for the hash. But i do not know if there are some constants which are run-dependent. Then for sure, i can not add them.
But then it would be awesome, to manually let the user add variables which should be included for the hash. What do you think?Anna Geller
Anna Geller
flow.serialized_hash()
and the config object?Michael Hadorn
10/21/2021, 10:12 AMMichael Hadorn
10/21/2021, 10:25 AMextract_flow_from_file()
you can not access the local variable of these flow-file. You got only the flow object itself. All the local variables are there (exec_vals in https://github.com/PrefectHQ/prefect/blob/master/src/prefect/utilities/storage.py#L88) but they can not been returned. Would be a nice thing...Michael Hadorn
10/25/2021, 8:23 AMflow.my_data = entities
Then I added a method, which hashes also these object and add this to the existing flow.serialized_hash(). And use this as idempotency_key.