https://prefect.io logo
Title
s

Shirley Monroe

02/07/2023, 5:00 PM
Hi folks! I have a flow deployment with type
kubernetes-job
that I want to be able to enable task result caching so all the subflows within the same pod could reuse results. I think I need to set up the pods to have a volume mounted for storage; how can I specify that in the deployment config? Also, there's one task that would be called with the same parameters from multiple concurrent flows. It'd be awesome to be able to cache that result & have it available across pods, but barring that, if I set up caching for that task would it be expected to cache correctly independently for all the pods? 🤞
c

Christopher Boyd

02/08/2023, 2:33 PM
Hi shirley - this is a pretty advanced case. You would need to configure persistent volumes and persistent volume claims on your cluster , then mount these into your job spec when you create the deployment
s

Shirley Monroe

02/08/2023, 2:42 PM
Hmm, OK, what about just caching within each pod? If I try to enable task caching I get error messages like
ValueError: Path /root/.prefect/storage/8fc55773dcb94a4d9aa4c22a35cd5e9b does not exist.
I know it's possible to create a writeable volume & mount it when you create a pod, and then setting
PREFECT_LOCAL_STORAGE_PATH
should let the cache use it; but I don't know how to get the volume-creation/mounting commands applied to pods created by my k8s agent. I'm assuming the deployment spec has to do it since that's where I had to specify the namespace, service account, etc.
So far my team has been running
prefect deployment build ...
to generate the yaml each time, with a list of override arguments that's honestly getting a bit out of hand; I think I can see how to edit the yaml to add this stuff, but I really can't see how to supply the required overrides as arguments.
actually, no, I was wrong about where to put things in the yaml even editing manually 😞
c

Christopher Boyd

02/08/2023, 6:38 PM
I think generally this is the value of using the python api to create your deployments via Deployment.build_from_flow
an example would look like:
>>> from my_project.flows import my_flow
>>> from prefect.deployments import Deployment
>>>
>>> deployment = Deployment.build_from_flow(
...     flow=my_flow,
...     name="example",
...     version="1",
...     tags=["demo"],
>>> )
>>> deployment.apply()
It’s much more static, and convenient, just building from a python file, rather than building via cli every time
s

Shirley Monroe

02/09/2023, 2:41 PM
well I got an emptyDir volume created & mounted & used for local storage, but I'm still getting errors for cached tasks, even when I only cache ones that are only called with matching params within the same pod
relevant bits of deployment yaml:
infrasturcture:
  env: {
    PREFECT_LOCAL_STORAGE_PATH: /cache
  }
  job:
    spec:
      template:
        spec:
          containers:
          - name: prefect-job
            volumeMounts:
            - name: prefect-cache
              mountPath: /cache
          volumes:
          - name: prefect-cache
            emptyDir: {}
about 2% of my flows work; the rest fail with errors like
ValueError: Path /cache/cfa8730c00b14ec5ba89d4db9ae5740c does not exist.
while attempting to read the result from the cache
c

Christopher Boyd

02/09/2023, 2:57 PM
When you are re-using results, do you need the actual OUTPUT of the task (e.g. a file that gets written or something), or just the state that it was previously completed? If it’s the former, you’ll need to decide how you want to persist the results and the naming / file name scheme to
/cache
. If it’s the latter, this should just be using your
cache_fn_key
and stored server side when you re-run the task with the same inputs etc.
s

Shirley Monroe

02/09/2023, 3:22 PM
these tasks just return a result; I thought that result would be cached as part of task caching. is that not true?
(I do need the result, not just the fact that the task completed)
g

George Coyne

02/16/2023, 8:10 PM
From your error, Prefect has been informed of a path in which a result was written, can you confirm that the file is present on the volume?
I personally prefer using object storage as opposed to managing volume mounts
@flow(
    name="My Demo Flow",
    persist_result=True, # Default is True
    result_storage=S3Bucket.load("result-storage"),
)
j

Jai P

02/17/2023, 3:44 PM
Hi @George Coyne, thanks for the response! Shirley is out but i'll be spending some time trying to re-create what they were seeing, and will let you know if i have further updates 🙂
✅ 1
Hi @George Coyne, following up here with what i've discovered: basically what ends up happening is that the
task_input_hash
function doesn't include any infrastructure identifying information as an input into the hash, so you end up getting a hash that is shared between two flows running on separate pods in a k8s cluster. In the example below:
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=5))
def my_cached_task():
    return random.random()

def my_flow():
    my_cached_task.submit()

if __name__ == "__main__":
    # both of these trigger a deployment of my_flow which is configured as a kubernetes-job
    run_deployment('my_flow')
    run_deployment('my_flow')
the second flow_run will fail because it runs on a second pod that won't have access to the storage location associated to the cache key computed in the first flow run