nurul akter towhid
02/07/2024, 3:44 AMMarvin
02/07/2024, 3:44 AMEncountered exception during execution:Traceback (most recent call last): File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 861, in orchestrate_flow_run result = await flow_call.aresult() File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 293, in aresult return await asyncio.wrap_future(self.future) File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async result = await coro File "/opt/prefect/prefect_data_pipeline-main/llm_data_collection/llm_data_collection.py", line 194, in llm_orchestrator_15k results = [await run.state.result().get() for run in worker_flow_runs] File "/opt/prefect/prefect_data_pipeline-main/llm_data_collection/llm_data_collection.py", line 194, in <listcomp> results = [await run.state.result().get() for run in worker_flow_runs] File "/usr/local/lib/python3.10/site-packages/prefect/client/utilities.py", line 51, in with_injected_client return await fn(*args, **kwargs) File "/usr/local/lib/python3.10/site-packages/prefect/results
nurul akter towhid
02/07/2024, 3:45 AMMarvin
02/07/2024, 3:46 AMPREFECT_LOCAL_STORAGE_PATH
to be accessible. If it's not, due to the ephemeral nature of containers, you get a ValueError
similar to the one you're facing.
The reported issue on GitHub (#8637) indicates that using Prefect's task_input_hash
without modifications is currently unsafe in such environments because it does not include infrastructure-identifying inputs.
To work around this, consider implementing a custom cache_key_fn
, especially when working with containers. You might use attributes like a Pod UID from Kubernetes' Downward API as part of the cache function. Here's an example:
deployment.yaml
change:
yaml
job:
...
spec:
template:
spec:
containers:
- name: prefect-job
env:
...
- name: POD_UID
valueFrom:
fieldRef:
fieldPath: metadata.uid
Custom cache key function:
python
import os
from typing import Any
from prefect.context import TaskRunContext
from prefect.utilities.hashing import hash_objects
def custom_cache_key_fn(context: TaskRunContext, arguments: dict[str, Any]) -> str | None:
return hash_objects(
context.task.task_key,
context.task.fn.__code__.co_code.hex(),
os.getenv("POD_UID", "NO_POD_UID_SPECIFIED"),
arguments,
)
Additionally, there's a related issue that Prefect does not fall back to re-computing task results if the expected cached data is missing on the filesystem (#6086). The solution could involve improving this fallback behavior and providing clearer guidance for caching in ephemeral environments.
For now, remember to use custom caching functions that account for the environmental changes unique to ephemeral infrastructures.