Eric Feldman
10/19/2021, 8:46 AMprefect.core.flow.Flow.serialized_hash
documentation that if the hash of the flow didn’t changed, that it won’t be uploaded to the server when calling register
But I have the same hash all over again and the server keeps having new versions of the flow 🤔Anna Geller (old account)
10/19/2021, 9:06 AMEric Feldman
10/19/2021, 9:10 AMclient.register
instead of flow.register
and I’m facing the same issueAnna Geller
Eric Feldman
10/19/2021, 9:53 AM0.15.6
code:
project_name = 'pname'
client = prefect.Client()
client.create_project(project_name)
def add(x, y):
prefect.context['logger'].info(f'{x}+{y}')
return x+y
class ExpTask(prefect.Task):
def __init__(self, method: Callable):
super().__init__(on_failure=self.on_failure, name=method.__name__)
self.method = method
def run(self, **kwargs):
prefect.context['logger'].info('sleeping')
sleep(3)
prefect.context['logger'].info('running method')
return self.method(**kwargs)
def on_failure(self, task, state):
# todo: how do i get the exception it self?
logger = prefect.context['logger']
<http://logger.info|logger.info>(dir(state))
<http://logger.info|logger.info>('~~~1111~~~')
executor = prefect.executors.DaskExecutor(cluster_class=dask.distributed.LocalCluster,
cluster_kwargs={'processes': False})
with prefect.Flow(name='test', executor=executor, storage=prefect.storage.Local()) as f:
num1 = ExpTask(add)(x=1, y=2)
nume2 = ExpTask(add)(x=num1, y=8)
# client.register(f, project_name=project_name)
f.register(project_name=project_name)
thanks!Anna Geller
if __name__ == "__main__":
flow.register(
project_name="your_project_name",
idempotency_key=flow.serialized_hash()
)
Eric Feldman
10/19/2021, 10:15 AMEric Feldman
10/19/2021, 10:21 AMbuild=True
to serialized_hash
will it detect code changes as well?Anna Geller
local_script_path
, I would imagine this would upload your flow to S3 before serializing the Flow object and determining whether registration needs to happen. So it should not affect whether reregistration takes place or not. But I will ask the team specifically for your type of storage.Eric Feldman
10/19/2021, 11:51 AMbuild=True
but when i’m passing it, a new hash is generated for the same flow even if now code was changed - same for both local and s3 storagesAnna Geller
build=True
, because it’s False by default. Could you try the default implementation and check if registration works as expected this way?
if __name__ == "__main__":
flow.register(
project_name="your_project_name",
idempotency_key=flow.serialized_hash()
)
What this comment means is that you need this build=True
only if you want to be 100% sure that the changes you make inside of your tasks will be reflected in the versioning, regardless of which storage you use.
But for instance, if you add new tasks, or change the order of tasks, this change will always result in a new Flow version, because the structure of the Flow changes.
Overall, if you want to be 100% sure that any change to the flow results in a new version upon registration, I wouldn’t use serialized hash at all and just keep incrementing the versions every time on registration.
To give you more concrete answer, I asked the team and will get back to you with more info on the impact of this build=True
argument.Eric Feldman
10/19/2021, 12:24 PMbuild=True
it updated the version even if nothing has changed
this is why i wanted to run the DAG in the cluster without registering it, its kinda a one time task that might or might not be ran again in the futureAnna Geller
Anna Geller
storage.build()
 is called before the flow is serialized and hashed
• As a result, the serialized hash includes the flow’s storage, so passing build as True/False will change the hash
• it works the same way regardless of the storage type you choose.
LMK if something is still unclear.Eric Feldman
10/19/2021, 1:56 PMAnna Geller
serialized_hash
is used only to detect changes in your flow structure, not changes in your task’s code.Eric Feldman
10/19/2021, 1:58 PMAnna Geller
idempotency_key
, not the storage.Anna Geller
serialized_hash
is good for.Eric Feldman
10/19/2021, 4:23 PMI believe that as long as you register your flow after every code changesince the flow isn’t an ETL and it just “heavy CPU task runner” I don’t really know when something changes
Reregistration is only needed if your Flow structure changed in your codeif I have a flow with 1 task, this task calls method
foo
and I change method’s foo
logic - will the flow run the new foo
code without registering it?Anna Geller