Bryce Morrow
12/08/2022, 6:07 PMpersist_result=True
, which helps speed up my workflow considerably. For simpler data transformations on the results of these expensive tasks, I don't tend to persist the results.
• For the tasks that I am persisting, their implementations often change as I iterate on ideas. So, for example, maybe I change the implementation of the task one
to return 10
instead of 5
. Whenever I change the implementation of one of my tasks with persisted results, I change the cache key. For this example, I'd change lambda *args: "one"
to lambda *args: "one1"
.
A simplified example:
@task(persist_result=True, cache_key_fn=lambda *args: "one")
async def one():
return 5
@task(persist_result=True, cache_key_fn=lambda *args: "two")
async def two():
return 2
@flow(cache_result_in_memory=False)
async def flow_test():
a = await one()
b = await two()
return a + b
await flow_test()
This seems to be working at the start, but eventually, I start running into an issue where tasks fail to run due to an error in decrypting their block data. I'll share the exact error log in the replies. I haven't yet determined the exact cause of the problem but I have some suspicions / notes.
The first concerning thing is that when I re-run this cell, say after changing a task implementation, I get some warnings that look like this:
/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/tasks.py:206: UserWarning: A task named 'one' and defined at '/var/folders/5c/hf9_blgj3jbbxxvkw_s9snsw0000gn/T/ipykernel_90535/3130328684.py:1' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:
`@task(name='my_unique_name', ...)`
warnings.warn(
/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/tasks.py:206: UserWarning: A task named 'two' and defined at '/var/folders/5c/hf9_blgj3jbbxxvkw_s9snsw0000gn/T/ipykernel_90535/3130328684.py:5' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:
And this makes sense, since I'm defining a task with the same name again. However, when I look at the path /var/folders/5c/hf9_blgj3jbbxxvkw_s9snsw0000gn/T/ipykernel_90535/3130328684.py
, it doesn't exist. However, there are different folders stored at the path /var/folders/5c/hf9_blgj3jbbxxvkw_s9snsw0000gn/T/
with names of the form ipykernel_<some_number>
. If I look at the contents of these directories, they contain small snippets of code from my notebook but they aren't the task definitions themselves usually. So I think the ipykernel is storing this information temporarily but then it changes locations, contents, disappears, etc.
I've also tested running the notebook end to end, having everything work fine, then restarting the conda kernel and re-running the notebook. This causes the error I'll share below. This strongly suggests to me that the way the kernel is managing the location of the notebook source code is the culprit here.
Another point is that if I clear the orion database using the command prefect orion database reset -y
then my issues are resolved. However, I would like to determine the issue and fix it in some better way.13:04:33.340 | INFO | prefect.engine - Created flow run 'poised-cobra' for flow 'flow-test'
13:04:33.396 | INFO | Flow run 'poised-cobra' - Created task run 'one-0d9ca636-0' for task 'one'
13:04:33.396 | INFO | Flow run 'poised-cobra' - Executing 'one-0d9ca636-0' immediately...
13:04:33.408 | INFO | Task run 'one-0d9ca636-0' - Finished in state Cached(type=COMPLETED)
13:04:33.414 | ERROR | prefect.orion - Encountered exception in request:
Traceback (most recent call last):
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/cryptography/fernet.py", line 133, in _verify_signature
h.verify(data[-32:])
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/cryptography/hazmat/primitives/hmac.py", line 72, in verify
ctx.verify(signature)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/cryptography/hazmat/backends/openssl/hmac.py", line 85, in verify
raise InvalidSignature("Signature did not match digest.")
cryptography.exceptions.InvalidSignature: Signature did not match digest.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
await <http://self.app|self.app>(scope, receive, _send)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
raise exc
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
await <http://self.app|self.app>(scope, receive, sender)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
raise e
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
await <http://self.app|self.app>(scope, receive, send)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/routing.py", line 706, in __call__
await route.handle(scope, receive, send)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/routing.py", line 276, in handle
await <http://self.app|self.app>(scope, receive, send)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/routing.py", line 66, in app
response = await func(request)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/utilities/server.py", line 101, in handle_response_scoped_depends
response = await default_handler(request)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/fastapi/routing.py", line 235, in app
raw_response = await run_endpoint_function(
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/fastapi/routing.py", line 161, in run_endpoint_function
return await dependant.call(**values)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/api/block_documents.py", line 87, in read_block_document_by_id
block_document = await models.block_documents.read_block_document_by_id(
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
return await fn(*args, **kwargs)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/models/block_documents.py", line 133, in read_block_document_by_id
block_documents = await read_block_documents(
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
return await fn(*args, **kwargs)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/models/block_documents.py", line 377, in read_block_documents
root_block_document = await BlockDocument.from_orm_model(
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/schemas/core.py", line 649, in from_orm_model
data = await orm_block_document.decrypt_data(session=session)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/database/orm_models.py", line 921, in decrypt_data
return await decrypt_fernet(session, self.data)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/utilities/encryption.py", line 41, in decrypt_fernet
return json.loads(fernet.decrypt(byte_blob).decode())
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/cryptography/fernet.py", line 90, in decrypt
return self._decrypt_data(data, timestamp, time_info)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/cryptography/fernet.py", line 151, in _decrypt_data
self._verify_signature(data)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/cryptography/fernet.py", line 135, in _verify_signature
raise InvalidToken
cryptography.fernet.InvalidToken
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/engine.py", line 617, in orchestrate_flow_run
result = await flow_call()
File "/var/folders/5c/hf9_blgj3jbbxxvkw_s9snsw0000gn/T/ipykernel_94965/3130328684.py", line 11, in flow_test
a = await one()
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/engine.py", line 965, in get_task_call_return_value
return await future._result()
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/states.py", line 84, in _get_state_result
result = await state.data.get()
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/results.py", line 415, in get
blob = await self._read_blob(client=client)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/results.py", line 425, in _read_blob
block_document = await client.read_block_document(self.storage_block_id)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/client/orion.py", line 1126, in read_block_document
response = await self._client.get(
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/httpx/_client.py", line 1757, in get
return await self.request(
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/httpx/_client.py", line 1533, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/client/base.py", line 229, in send
response = await self._send_with_retry(
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/client/base.py", line 187, in _send_with_retry
response = await request()
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/httpx/_client.py", line 1620, in send
response = await self._send_handling_auth(
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/httpx/_client.py", line 1648, in _send_handling_auth
response = await self._send_handling_redirects(
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/httpx/_client.py", line 1685, in _send_handling_redirects
response = await self._send_single_request(request)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/httpx/_client.py", line 1722, in _send_single_request
response = await transport.handle_async_request(request)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/httpx/_transports/asgi.py", line 152, in handle_async_request
await <http://self.app|self.app>(scope, receive, send)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/fastapi/applications.py", line 270, in __call__
await super().__call__(scope, receive, send)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/applications.py", line 124, in __call__
await self.middleware_stack(scope, receive, send)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/middleware/errors.py", line 184, in __call__
raise exc
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
await <http://self.app|self.app>(scope, receive, _send)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/middleware/cors.py", line 84, in __call__
await <http://self.app|self.app>(scope, receive, send)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
raise exc
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
await <http://self.app|self.app>(scope, receive, sender)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
raise e
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
await <http://self.app|self.app>(scope, receive, send)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/routing.py", line 706, in __call__
await route.handle(scope, receive, send)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/routing.py", line 443, in handle
await <http://self.app|self.app>(scope, receive, send)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/fastapi/applications.py", line 270, in __call__
await super().__call__(scope, receive, send)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/applications.py", line 124, in __call__
await self.middleware_stack(scope, receive, send)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/middleware/errors.py", line 184, in __call__
raise exc
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
await <http://self.app|self.app>(scope, receive, _send)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
raise exc
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
await <http://self.app|self.app>(scope, receive, sender)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
raise e
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
await <http://self.app|self.app>(scope, receive, send)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/routing.py", line 706, in __call__
await route.handle(scope, receive, send)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/routing.py", line 276, in handle
await <http://self.app|self.app>(scope, receive, send)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/routing.py", line 66, in app
response = await func(request)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/utilities/server.py", line 101, in handle_response_scoped_depends
response = await default_handler(request)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/fastapi/routing.py", line 235, in app
raw_response = await run_endpoint_function(
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/fastapi/routing.py", line 161, in run_endpoint_function
return await dependant.call(**values)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/api/block_documents.py", line 87, in read_block_document_by_id
block_document = await models.block_documents.read_block_document_by_id(
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
return await fn(*args, **kwargs)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/models/block_documents.py", line 133, in read_block_document_by_id
block_documents = await read_block_documents(
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
return await fn(*args, **kwargs)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/models/block_documents.py", line 377, in read_block_documents
root_block_document = await BlockDocument.from_orm_model(
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/schemas/core.py", line 649, in from_orm_model
data = await orm_block_document.decrypt_data(session=session)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/database/orm_models.py", line 921, in decrypt_data
return await decrypt_fernet(session, self.data)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/utilities/encryption.py", line 41, in decrypt_fernet
return json.loads(fernet.decrypt(byte_blob).decode())
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/cryptography/fernet.py", line 90, in decrypt
return self._decrypt_data(data, timestamp, time_info)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/cryptography/fernet.py", line 151, in _decrypt_data
self._verify_signature(data)
File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/cryptography/fernet.py", line 135, in _verify_signature
raise InvalidToken
cryptography.fernet.InvalidToken
13:04:33.430 | ERROR | Flow run 'poised-cobra' - Finished in state Failed('Flow run encountered an exception. Traceback (most recent call last):\n File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/cryptography/fernet.py", line 133, in _verify_signature\n h.verify(data[-32:])\n File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/cryptography/hazmat/primitives/hmac.py", line 72, in verify\n ctx.verify(signature)\n File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/cryptography/hazmat/backends/openssl/hmac.py", line 85, in verify\n raise InvalidSignature("Signature did not match digest.")\ncryptography.exceptions.InvalidSignature: Signature did not match digest.\n\nDuring handling of the above exception, another exception occurred:\n\ncryptography.fernet.InvalidToken\n')