Hello :slightly_smiling_face: I see in `prefect.co...
# ask-community
e
Hello 🙂 I see in
prefect.core.flow.Flow.serialized_hash
documentation that if the hash of the flow didn’t changed, that it won’t be uploaded to the server when calling
register
But I have the same hash all over again and the server keeps having new versions of the flow 🤔
a
Hi @Eric Feldman, perhaps you could use prefect register CLI instead? Flows with unchanged metadata won't be registered again. Here is documentation about that: https://docs.prefect.io/api/latest/cli/register.html
e
i can’t use the cli because it is all part of a flow starts in a fastapi call and it all happens in my code i tried to use
client.register
instead of
flow.register
and I’m facing the same issue
a
@Eric Feldman sure, let’s do it this way then. Can you share your Prefect version and the code you use to register the flow? I will try to reproduce
e
prefect version is
0.15.6
code:
Copy code
project_name = 'pname'
client = prefect.Client()
client.create_project(project_name)

def add(x, y):
    prefect.context['logger'].info(f'{x}+{y}')
    return x+y

class ExpTask(prefect.Task):
    def __init__(self, method: Callable):
        super().__init__(on_failure=self.on_failure, name=method.__name__)
        self.method = method
        
    def run(self, **kwargs):
        prefect.context['logger'].info('sleeping')
        sleep(3)
        prefect.context['logger'].info('running method')
        return self.method(**kwargs)
        
    def on_failure(self, task, state):
        # todo: how do i get the exception it self?
        logger = prefect.context['logger']
        <http://logger.info|logger.info>(dir(state))
        <http://logger.info|logger.info>('~~~1111~~~')
        
        
executor = prefect.executors.DaskExecutor(cluster_class=dask.distributed.LocalCluster,
                                          cluster_kwargs={'processes': False})
with prefect.Flow(name='test', executor=executor, storage=prefect.storage.Local()) as f:
    num1 = ExpTask(add)(x=1, y=2)
    nume2 = ExpTask(add)(x=num1, y=8)
    
# client.register(f, project_name=project_name)
f.register(project_name=project_name)
thanks!
a
so if you want to use serialied_hash, you would need to add:
Copy code
if __name__ == "__main__":
    flow.register(
    project_name="your_project_name",
    idempotency_key=flow.serialized_hash()
)
upvote 1
e
cool! thanks!!
if I pass
build=True
to
serialized_hash
will it detect code changes as well?
a
@Eric Feldman Which storage do you use? The docstring says that with build=True, the flow’s environment is built prior to serialization. So if you use e.g. S3 storage with
local_script_path
, I would imagine this would upload your flow to S3 before serializing the Flow object and determining whether registration needs to happen. So it should not affect whether reregistration takes place or not. But I will ask the team specifically for your type of storage.
e
@Anna Geller locally i’m using LocalStorage but I can test it out with S3 storage in the docs there is a comment that in order to detect code changes in the flow, i need to pass
build=True
but when i’m passing it, a new hash is generated for the same flow even if now code was changed - same for both local and s3 storages
a
I don’t think that you need to pass
build=True
, because it’s False by default. Could you try the default implementation and check if registration works as expected this way?
Copy code
if __name__ == "__main__":
    flow.register(
    project_name="your_project_name",
    idempotency_key=flow.serialized_hash()
)
What this comment means is that you need this
build=True
only if you want to be 100% sure that the changes you make inside of your tasks will be reflected in the versioning, regardless of which storage you use. But for instance, if you add new tasks, or change the order of tasks, this change will always result in a new Flow version, because the structure of the Flow changes. Overall, if you want to be 100% sure that any change to the flow results in a new version upon registration, I wouldn’t use serialized hash at all and just keep incrementing the versions every time on registration. To give you more concrete answer, I asked the team and will get back to you with more info on the impact of this
build=True
argument.
e
I want to flow to have a newer version if the code of the task changed as well (In my solution I have a task that runs a callable that it got in the consturctor) its not that only the metadata change is important for me but when I changed to to
build=True
it updated the version even if nothing has changed this is why i wanted to run the DAG in the cluster without registering it, its kinda a one time task that might or might not be ran again in the future
a
Got it, will check with the team and get back to you
🙌 1
@Eric Feldman I can now share more: • If build is True, the Flow’s hash is computed using the built storage, meaning that  
storage.build()
 is called before the flow is serialized and hashed • As a result, the serialized hash includes the flow’s storage, so passing build as True/False will change the hash • it works the same way regardless of the storage type you choose. LMK if something is still unclear.
e
the main question is if it will detect changes in the code and wont update the flow otherwise
a
Good question. Due to the hybrid execution model, Prefect is NOT aware of code changes inside your tasks because we don’t track that information. Prefect operates purely on metadata. This is why the
serialized_hash
is used only to detect changes in your flow structure, not changes in your task’s code.
e
So it either I updates the flow version if nothing changes, or i might have old code in the flow. got it, thanks 🙂
a
I believe that as long as you register your flow after every code change, and you build storage on registration, then you will not use old code, because Prefect gets your code from storage within each FlowRun. Only the flow version is affected by the
idempotency_key
, not the storage.
Reregistration is only needed if your Flow structure changed in your code, e.g. a new task was added. And this is what
serialized_hash
is good for.
e
I believe that as long as you register your flow after every code change
since the flow isn’t an ETL and it just “heavy CPU task runner” I don’t really know when something changes
Reregistration is only needed if your Flow structure changed in your code
if I have a flow with 1 task, this task calls method
foo
and I change method’s
foo
logic - will the flow run the new
foo
code without registering it?
a
Yes, if you have a script based storage like S3 or GitHub, and nothing changed in your Flow structure (i.e. no new tasks or edges, tasks are still in the same order as before), then if only some logic inside of your task changed, it’s fine. You then commit your new Flow python file to Git or S3, and when Flow is scheduled or invoked, then FlowRunner will pick up from storage the latest version of your code and it will run it with your changes, as expected. The Python file version in Storage doesn’t need to be the same as the one used at registration, as long as the Flow’s metadata remained unchanged. Registration is used by Prefect to infer metadata about your Flow: your tasks, the order of tasks and dependencies between them, the schedule, your storage and run configuration. But Prefect doesn’t know what your tasks are doing before runtime.