https://prefect.io logo
Title
b

Bryce Morrow

12/08/2022, 6:07 PM
Hey everyone, been playing around with prefect the last few days and really enjoying using it so far, however, I'm running into some issues with my workflow / setup and would appreciate some feedback / help 🙂 My environment is setup like this • I'm working in jupyter-lab environment (installed globally). • I have a python 3.10.8 conda kernel installed in this environment, and I'm using this kernel to run my notebooks. My workflow is like this • Define a number of tasks. Some tasks involve expensive api calls, so for these I set
persist_result=True
, which helps speed up my workflow considerably. For simpler data transformations on the results of these expensive tasks, I don't tend to persist the results. • For the tasks that I am persisting, their implementations often change as I iterate on ideas. So, for example, maybe I change the implementation of the task
one
to return
10
instead of
5
. Whenever I change the implementation of one of my tasks with persisted results, I change the cache key. For this example, I'd change
lambda *args: "one"
to
lambda *args: "one1"
. A simplified example:
@task(persist_result=True, cache_key_fn=lambda *args: "one")
async def one(): 
    return 5

@task(persist_result=True, cache_key_fn=lambda *args: "two")
async def two(): 
    return 2

@flow(cache_result_in_memory=False)
async def flow_test():
    a = await one()
    b = await two()
    return a + b 

await flow_test()
This seems to be working at the start, but eventually, I start running into an issue where tasks fail to run due to an error in decrypting their block data. I'll share the exact error log in the replies. I haven't yet determined the exact cause of the problem but I have some suspicions / notes. The first concerning thing is that when I re-run this cell, say after changing a task implementation, I get some warnings that look like this:
/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/tasks.py:206: UserWarning: A task named 'one' and defined at '/var/folders/5c/hf9_blgj3jbbxxvkw_s9snsw0000gn/T/ipykernel_90535/3130328684.py:1' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:

 `@task(name='my_unique_name', ...)`
  warnings.warn(
/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/tasks.py:206: UserWarning: A task named 'two' and defined at '/var/folders/5c/hf9_blgj3jbbxxvkw_s9snsw0000gn/T/ipykernel_90535/3130328684.py:5' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:
And this makes sense, since I'm defining a task with the same name again. However, when I look at the path
/var/folders/5c/hf9_blgj3jbbxxvkw_s9snsw0000gn/T/ipykernel_90535/3130328684.py
, it doesn't exist. However, there are different folders stored at the path
/var/folders/5c/hf9_blgj3jbbxxvkw_s9snsw0000gn/T/
with names of the form
ipykernel_<some_number>
. If I look at the contents of these directories, they contain small snippets of code from my notebook but they aren't the task definitions themselves usually. So I think the ipykernel is storing this information temporarily but then it changes locations, contents, disappears, etc. I've also tested running the notebook end to end, having everything work fine, then restarting the conda kernel and re-running the notebook. This causes the error I'll share below. This strongly suggests to me that the way the kernel is managing the location of the notebook source code is the culprit here. Another point is that if I clear the orion database using the command
prefect orion database reset -y
then my issues are resolved. However, I would like to determine the issue and fix it in some better way.
And here's the error message (broken into two messages since it's pretty long)
13:04:33.340 | INFO    | prefect.engine - Created flow run 'poised-cobra' for flow 'flow-test'
13:04:33.396 | INFO    | Flow run 'poised-cobra' - Created task run 'one-0d9ca636-0' for task 'one'
13:04:33.396 | INFO    | Flow run 'poised-cobra' - Executing 'one-0d9ca636-0' immediately...
13:04:33.408 | INFO    | Task run 'one-0d9ca636-0' - Finished in state Cached(type=COMPLETED)
13:04:33.414 | ERROR   | prefect.orion - Encountered exception in request:
Traceback (most recent call last):
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/cryptography/fernet.py", line 133, in _verify_signature
    h.verify(data[-32:])
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/cryptography/hazmat/primitives/hmac.py", line 72, in verify
    ctx.verify(signature)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/cryptography/hazmat/backends/openssl/hmac.py", line 85, in verify
    raise InvalidSignature("Signature did not match digest.")
cryptography.exceptions.InvalidSignature: Signature did not match digest.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await <http://self.app|self.app>(scope, receive, _send)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
    raise exc
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
    await <http://self.app|self.app>(scope, receive, sender)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await <http://self.app|self.app>(scope, receive, send)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/routing.py", line 706, in __call__
    await route.handle(scope, receive, send)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/routing.py", line 276, in handle
    await <http://self.app|self.app>(scope, receive, send)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/routing.py", line 66, in app
    response = await func(request)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/utilities/server.py", line 101, in handle_response_scoped_depends
    response = await default_handler(request)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/fastapi/routing.py", line 235, in app
    raw_response = await run_endpoint_function(
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/fastapi/routing.py", line 161, in run_endpoint_function
    return await dependant.call(**values)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/api/block_documents.py", line 87, in read_block_document_by_id
    block_document = await models.block_documents.read_block_document_by_id(
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
    return await fn(*args, **kwargs)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/models/block_documents.py", line 133, in read_block_document_by_id
    block_documents = await read_block_documents(
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
    return await fn(*args, **kwargs)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/models/block_documents.py", line 377, in read_block_documents
    root_block_document = await BlockDocument.from_orm_model(
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/schemas/core.py", line 649, in from_orm_model
    data = await orm_block_document.decrypt_data(session=session)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/database/orm_models.py", line 921, in decrypt_data
    return await decrypt_fernet(session, self.data)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/utilities/encryption.py", line 41, in decrypt_fernet
    return json.loads(fernet.decrypt(byte_blob).decode())
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/cryptography/fernet.py", line 90, in decrypt
    return self._decrypt_data(data, timestamp, time_info)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/cryptography/fernet.py", line 151, in _decrypt_data
    self._verify_signature(data)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/cryptography/fernet.py", line 135, in _verify_signature
    raise InvalidToken
cryptography.fernet.InvalidToken
During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/engine.py", line 617, in orchestrate_flow_run
    result = await flow_call()
  File "/var/folders/5c/hf9_blgj3jbbxxvkw_s9snsw0000gn/T/ipykernel_94965/3130328684.py", line 11, in flow_test
    a = await one()
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/engine.py", line 965, in get_task_call_return_value
    return await future._result()
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/states.py", line 84, in _get_state_result
    result = await state.data.get()
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/results.py", line 415, in get
    blob = await self._read_blob(client=client)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/results.py", line 425, in _read_blob
    block_document = await client.read_block_document(self.storage_block_id)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/client/orion.py", line 1126, in read_block_document
    response = await self._client.get(
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/httpx/_client.py", line 1757, in get
    return await self.request(
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/httpx/_client.py", line 1533, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/client/base.py", line 229, in send
    response = await self._send_with_retry(
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/client/base.py", line 187, in _send_with_retry
    response = await request()
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/httpx/_client.py", line 1620, in send
    response = await self._send_handling_auth(
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/httpx/_client.py", line 1648, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/httpx/_client.py", line 1685, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/httpx/_client.py", line 1722, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/httpx/_transports/asgi.py", line 152, in handle_async_request
    await <http://self.app|self.app>(scope, receive, send)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/fastapi/applications.py", line 270, in __call__
    await super().__call__(scope, receive, send)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/applications.py", line 124, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await <http://self.app|self.app>(scope, receive, _send)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/middleware/cors.py", line 84, in __call__
    await <http://self.app|self.app>(scope, receive, send)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
    raise exc
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
    await <http://self.app|self.app>(scope, receive, sender)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await <http://self.app|self.app>(scope, receive, send)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/routing.py", line 706, in __call__
    await route.handle(scope, receive, send)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/routing.py", line 443, in handle
    await <http://self.app|self.app>(scope, receive, send)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/fastapi/applications.py", line 270, in __call__
    await super().__call__(scope, receive, send)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/applications.py", line 124, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await <http://self.app|self.app>(scope, receive, _send)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
    raise exc
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
    await <http://self.app|self.app>(scope, receive, sender)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await <http://self.app|self.app>(scope, receive, send)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/routing.py", line 706, in __call__
    await route.handle(scope, receive, send)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/routing.py", line 276, in handle
    await <http://self.app|self.app>(scope, receive, send)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/starlette/routing.py", line 66, in app
    response = await func(request)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/utilities/server.py", line 101, in handle_response_scoped_depends
    response = await default_handler(request)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/fastapi/routing.py", line 235, in app
    raw_response = await run_endpoint_function(
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/fastapi/routing.py", line 161, in run_endpoint_function
    return await dependant.call(**values)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/api/block_documents.py", line 87, in read_block_document_by_id
    block_document = await models.block_documents.read_block_document_by_id(
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
    return await fn(*args, **kwargs)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/models/block_documents.py", line 133, in read_block_document_by_id
    block_documents = await read_block_documents(
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
    return await fn(*args, **kwargs)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/models/block_documents.py", line 377, in read_block_documents
    root_block_document = await BlockDocument.from_orm_model(
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/schemas/core.py", line 649, in from_orm_model
    data = await orm_block_document.decrypt_data(session=session)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/database/orm_models.py", line 921, in decrypt_data
    return await decrypt_fernet(session, self.data)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/prefect/orion/utilities/encryption.py", line 41, in decrypt_fernet
    return json.loads(fernet.decrypt(byte_blob).decode())
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/cryptography/fernet.py", line 90, in decrypt
    return self._decrypt_data(data, timestamp, time_info)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/cryptography/fernet.py", line 151, in _decrypt_data
    self._verify_signature(data)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/cryptography/fernet.py", line 135, in _verify_signature
    raise InvalidToken
cryptography.fernet.InvalidToken
13:04:33.430 | ERROR   | Flow run 'poised-cobra' - Finished in state Failed('Flow run encountered an exception. Traceback (most recent call last):\n  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/cryptography/fernet.py", line 133, in _verify_signature\n    h.verify(data[-32:])\n  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/cryptography/hazmat/primitives/hmac.py", line 72, in verify\n    ctx.verify(signature)\n  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/cryptography/hazmat/backends/openssl/hmac.py", line 85, in verify\n    raise InvalidSignature("Signature did not match digest.")\ncryptography.exceptions.InvalidSignature: Signature did not match digest.\n\nDuring handling of the above exception, another exception occurred:\n\ncryptography.fernet.InvalidToken\n')