Hi there I'm still struggling with the concept of ...
# ask-community
m
Hi there I'm still struggling with the concept of pickle based storage -> we use s3 as storage and docker run. I read the docs here: https://docs.prefect.io/orchestration/flow_config/storage.html#pickle-based-storage). What exactly is persisted in pickled file? • For sure it's the flow-object • but also global variables are frozen (e.g. datetime.now() will be never refreshed in the run, even it's called outside of the flow-context) • but every piece of code in a task will be executed So if I build some objects while the flow build / register, this objects are frozen. Right? But if I change some methods for these objects, rebuild the image and rerun the flow based on the "previous" flow-definition the new method will be called (but not if I change the init-method). Is there any explanation which parts are dynamic and which part are frozen. Thanks for any help!
Copy code
import time
from datetime import datetime

import prefect
from prefect import Flow, task, Parameter, unmapped
from prefect.storage import S3, Docker
from prefect.run_configs import DockerRun


@task
def say(my_time, my_time2):
    logger = prefect.context['logger']
    <http://logger.info|logger.info>(f"my_time: {my_time} | my_time2: {my_time2}")


date_format = '%Y-%m-%d %H:%M:%S.%f'

now = datetime.now()
my_time = now.strftime(date_format)
time.sleep(1)

storage = S3(
    bucket=""
    , client_options=dict()
)

with Flow(
    "Test the Storage"
    , storage=storage
    , run_config=DockerRun(
        image=""
    ),
) as flow:
    now2 = datetime.now()
    my_time2 = now2.strftime(date_format)

    t1 = say(my_time, my_time2)


if __name__ == '__main__':
    # flow.run()
    hash = flow.serialized_hash()
    print(hash)
    flow.register(
        project_name="cdwh prefect-development"
        , labels=['prefect-development']
        # , idempotency_key=hash
        , set_schedule_active=False
    )
So both times (outside context and inside) are frozen while the register. Now If i register the flow again with
idempotency_key=flow.serialized_hash()
there is no change, even the time changed.
using prefect 0.15.5
e
Hey @Michael Hadorn. Before I explain why this happens, I think this is weird behavior as well.
TLDR: Pickled flow stores your constant datetimes, but serialized hash does not use constants to calculate hash value.
First, try visualizing this flow. There is no task for your constant datetimes. Because they are calculated via python at that specific line, and passed to your
say
task as constant values. Inside or outside flow context doesn't matter for those. In that case, where are your constant datetime values stored? Turns out flow instances have a dictionary field called constants, where the flow stores which tasks should receive which constants. This field is obviously pickled in the file we upload to s3. However, the field is not included in hash calculation. The included fields I believe are these: https://github.com/PrefectHQ/prefect/blob/efa558f4117ca2c801325a745999b64671fde4e1/src/prefect/serialization/flow.py#L82
Should constants be included in the hash? I believe so, since flow behavior changes. Would this inclusion break anything? @Kevin Kho
a
Good question @emre. Afaik, the hash is primarily needed to determine whether flow metadata changed, so things like storage, run configuration, schedule, and the structure of your flow, i.e. tasks. Global variables and constants don’t belong to the metadata that Prefect stores. The easiest way to solve it and not trip by what is frozen and what is not @Michael Hadorn, is to move this dynamically determined date into a separate task and pass it downstream as data dependency. This way, you can be sure that your flow will behave as you expects.
Copy code
@task
def get_current_time():
    date_format = '%Y-%m-%d %H:%M:%S.%f'
    now = datetime.now()
    return now.strftime(date_format)


with Flow("Test the Storage") as flow:
    my_time = get_current_time()
    my_time_2 = get_current_time()
    t1 = say(my_time, my_time2)
m
Thanks a lot for your answers @Anna Geller @emre! I'm agree that if you put these code in an own tasks it's working like it should. And I guess, I do understand the way of this storage much better. With my example i tried to do a minimal easy to understand example. In our real case we build a generic flow based on objects from a database. There is also a possibility to set a config for these objects (which will be frozen at the registration time). This config can change. To got this config-change to the flow, we have a seperate register flow, which rebuild the definitive flow on a daily base (it will rebuilt with
from prefect.utilities.storage import extract_flow_from_file
where the objects are loaded as a constant, but the call of this is in a task). This register flow.register() was set with the use of a
idempotency_key
to only update it if something changed. Now: If there is an config-change, this will be reflected in the object collection (as a constant what i learned). And this will not be reflected in a different hash. So my workaround is to remove the
idempotency_key
. This is fine, but i generate everyday a new version - and this is not optimal. In this (maybe edge) case, it would be very nice to use also the constants for the hash. But i do not know if there are some constants which are run-dependent. Then for sure, i can not add them. But then it would be awesome, to manually let the user add variables which should be included for the hash. What do you think?
a
Thanks for sharing your use case! This is indeed a bit more involved than just wrapping dynamic code into a task. 🙂 It looks like you’ve got it working, and the only thing which is problematic is that the version gets incremented every day. Overall, when it comes to versioning, you can construct any idempotency key you’d like for pseudo-versioniong. So perhaps you could build some function that hashes this config object and pass this hash (rather than the hash of a flow object) as idempotency key before registering?
1
@Michael Hadorn or perhaps go even as far as building a hash that combines both
flow.serialized_hash()
and the config object?
1
m
Ah well 😅 - yes guess that's a valid solution. Thanks a lot for your help. Iwill try this. Before your answers, I was more thinking that i didn't get the concept of storage. But now it's really much clearer. Merci!
👍 1
FTR: If you use
extract_flow_from_file()
you can not access the local variable of these flow-file. You got only the flow object itself. All the local variables are there (exec_vals in https://github.com/PrefectHQ/prefect/blob/master/src/prefect/utilities/storage.py#L88) but they can not been returned. Would be a nice thing...
Update: to bypass the additional objects I use a custom property on the flow. E.g.
Copy code
flow.my_data = entities
Then I added a method, which hashes also these object and add this to the existing flow.serialized_hash(). And use this as idempotency_key.
👍 1