Shirley Monroe
02/07/2023, 5:00 PMkubernetes-job
that I want to be able to enable task result caching so all the subflows within the same pod could reuse results. I think I need to set up the pods to have a volume mounted for storage; how can I specify that in the deployment config?
Also, there's one task that would be called with the same parameters from multiple concurrent flows. It'd be awesome to be able to cache that result & have it available across pods, but barring that, if I set up caching for that task would it be expected to cache correctly independently for all the pods? 🤞Christopher Boyd
02/08/2023, 2:33 PMShirley Monroe
02/08/2023, 2:42 PMValueError: Path /root/.prefect/storage/8fc55773dcb94a4d9aa4c22a35cd5e9b does not exist.
I know it's possible to create a writeable volume & mount it when you create a pod, and then setting PREFECT_LOCAL_STORAGE_PATH
should let the cache use it; but I don't know how to get the volume-creation/mounting commands applied to pods created by my k8s agent. I'm assuming the deployment spec has to do it since that's where I had to specify the namespace, service account, etc.Shirley Monroe
02/08/2023, 3:01 PMprefect deployment build ...
to generate the yaml each time, with a list of override arguments that's honestly getting a bit out of hand; I think I can see how to edit the yaml to add this stuff, but I really can't see how to supply the required overrides as arguments.Shirley Monroe
02/08/2023, 3:53 PMChristopher Boyd
02/08/2023, 6:38 PMChristopher Boyd
02/08/2023, 6:38 PMChristopher Boyd
02/08/2023, 6:39 PM>>> from my_project.flows import my_flow
>>> from prefect.deployments import Deployment
>>>
>>> deployment = Deployment.build_from_flow(
... flow=my_flow,
... name="example",
... version="1",
... tags=["demo"],
>>> )
>>> deployment.apply()
It’s much more static, and convenient, just building from a python file, rather than building via cli every timeShirley Monroe
02/09/2023, 2:41 PMShirley Monroe
02/09/2023, 2:44 PMinfrasturcture:
env: {
PREFECT_LOCAL_STORAGE_PATH: /cache
}
job:
spec:
template:
spec:
containers:
- name: prefect-job
volumeMounts:
- name: prefect-cache
mountPath: /cache
volumes:
- name: prefect-cache
emptyDir: {}
Shirley Monroe
02/09/2023, 2:50 PMValueError: Path /cache/cfa8730c00b14ec5ba89d4db9ae5740c does not exist.
while attempting to read the result from the cacheChristopher Boyd
02/09/2023, 2:57 PM/cache
. If it’s the latter, this should just be using your cache_fn_key
and stored server side when you re-run the task with the same inputs etc.Shirley Monroe
02/09/2023, 3:22 PMShirley Monroe
02/09/2023, 3:25 PMGeorge Coyne
02/16/2023, 8:10 PMGeorge Coyne
02/16/2023, 8:13 PM@flow(
name="My Demo Flow",
persist_result=True, # Default is True
result_storage=S3Bucket.load("result-storage"),
)
Jai P
02/17/2023, 3:44 PMJai P
02/21/2023, 6:14 PMtask_input_hash
function doesn't include any infrastructure identifying information as an input into the hash, so you end up getting a hash that is shared between two flows running on separate pods in a k8s cluster. In the example below:
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=5))
def my_cached_task():
return random.random()
def my_flow():
my_cached_task.submit()
if __name__ == "__main__":
# both of these trigger a deployment of my_flow which is configured as a kubernetes-job
run_deployment('my_flow')
run_deployment('my_flow')
the second flow_run will fail because it runs on a second pod that won't have access to the storage location associated to the cache key computed in the first flow runBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by