<@ULVA73B9P> Encountered exception during executio...
# ask-marvin
n
@Marvin Encountered exception during execution: Traceback (most recent call last): File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 861, in orchestrate_flow_run result = await flow_call.aresult() File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 293, in aresult return await asyncio.wrap_future(self.future) File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async result = await coro File "/opt/prefect/prefect_data_pipeline-main/llm_data_collection/llm_data_collection.py", line 194, in llm_orchestrator_15k results = [await run.state.result().get() for run in worker_flow_runs] File "/opt/prefect/prefect_data_pipeline-main/llm_data_collection/llm_data_collection.py", line 194, in <listcomp> results = [await run.state.result().get() for run in worker_flow_runs] File "/usr/local/lib/python3.10/site-packages/prefect/client/utilities.py", line 51, in with_injected_client return await fn(*args, **kwargs) File "/usr/local/lib/python3.10/site-packages/prefect/results.py", line 514, in get blob = await self._read_blob(client=client) File "/usr/local/lib/python3.10/site-packages/prefect/client/utilities.py", line 51, in with_injected_client return await fn(*args, **kwargs) File "/usr/local/lib/python3.10/site-packages/prefect/results.py", line 529, in _read_blob content = await storage_block.read_path(self.storage_key) File "/usr/local/lib/python3.10/site-packages/prefect/filesystems.py", line 217, in read_path raise ValueError(f"Path {path} does not exist.") ValueError: Path /root/.prefect/storage/1d3c12ab18fe4439ab2d79a848e9fdbe does not exist. 021835 AM prefect.flow_runs ERROR Finished in state Failed('Flow run encountered an exception. ValueError: Path /root/.prefect/storage/1d3c12ab18fe4439ab2d79a848e9fdbe does not exist.')
m
Your message was too long by 235 tokens - please shorten it and try again. For reference, here's your message at the allowed limit:
Encountered exception during execution:
Traceback (most recent call last): File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 861, in orchestrate_flow_run result = await flow_call.aresult() File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 293, in aresult return await asyncio.wrap_future(self.future) File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async result = await coro File "/opt/prefect/prefect_data_pipeline-main/llm_data_collection/llm_data_collection.py", line 194, in llm_orchestrator_15k results = [await run.state.result().get() for run in worker_flow_runs] File "/opt/prefect/prefect_data_pipeline-main/llm_data_collection/llm_data_collection.py", line 194, in <listcomp> results = [await run.state.result().get() for run in worker_flow_runs] File "/usr/local/lib/python3.10/site-packages/prefect/client/utilities.py", line 51, in with_injected_client return await fn(*args, **kwargs) File "/usr/local/lib/python3.10/site-packages/prefect/results
n
@Marvin ValueError: Path /root/.prefect/storage/1d3c12ab18fe4439ab2d79a848e9fdbe does not exist. ERROR Finished in state Failed('Flow run encountered an exception. ValueError: Path /root/.prefect/storage/1d3c12ab18fe4439ab2d79a848e9fdbe does not exist.')
m
The problem you're encountering relates to task result caching in ephemeral infrastructure, like Kubernetes Pods or Docker Containers. When a cached result is attempted to be used across different containers, Prefect expects the path stored in
PREFECT_LOCAL_STORAGE_PATH
to be accessible. If it's not, due to the ephemeral nature of containers, you get a
ValueError
similar to the one you're facing. The reported issue on GitHub (#8637) indicates that using Prefect's
task_input_hash
without modifications is currently unsafe in such environments because it does not include infrastructure-identifying inputs. To work around this, consider implementing a custom
cache_key_fn
, especially when working with containers. You might use attributes like a Pod UID from Kubernetes' Downward API as part of the cache function. Here's an example:
deployment.yaml
change:
Copy code
yaml
  job:
    ...
    spec:
      template:
        spec:
          containers:
          - name: prefect-job
            env:
              ...
              - name: POD_UID
                valueFrom:
                  fieldRef:
                    fieldPath: metadata.uid
Custom cache key function:
Copy code
python
import os
from typing import Any
from prefect.context import TaskRunContext
from prefect.utilities.hashing import hash_objects

def custom_cache_key_fn(context: TaskRunContext, arguments: dict[str, Any]) -> str | None:
    return hash_objects(
        context.task.task_key,
        context.task.fn.__code__.co_code.hex(),
        os.getenv("POD_UID", "NO_POD_UID_SPECIFIED"),
        arguments,
    )
Additionally, there's a related issue that Prefect does not fall back to re-computing task results if the expected cached data is missing on the filesystem (#6086). The solution could involve improving this fallback behavior and providing clearer guidance for caching in ephemeral environments. For now, remember to use custom caching functions that account for the environmental changes unique to ephemeral infrastructures.