https://prefect.io logo
Title
c

Cab Maddux

03/23/2020, 2:59 PM
Hi, looking to clarify a few details about persistence and caching. Given Flow
A
containing task
X
which has input
arg1
, takes a long time to run and returns a string. It uses
GCSResultHandler
, with
cache_for=datatime.timedelta(hours=1)
and
cache_validator=prefect.engine.cache_validators.all_inputs
: 1. A run of flow
A
is triggered and the input for task
X
is
arg1='abc'
and the output is
applebananacarrot
2. 5 minutes another run of flow
A
is triggered and the input for task
X
is
arg1='def'
and the output is
danceearflight
3. 5 minutes later another run of flow
A
is triggered and the input for task
X
is
arg1=abc
After #1 we'll have a cached value for task
X
and input
abc
pointing to GCS URI storing pickled
applebananacarrot
. After #2 we'll have and additional cached value for task
X
and input
def
pointing to GCS URI storing pickled
danceearflight
. Then #3 will pull GCS URI for pickled
applebananacarrot
from cache. Is that correct?
j

josh

03/23/2020, 3:07 PM
In this scenario does #2 happen while #1 is still running and has not completed?
c

Cab Maddux

03/23/2020, 3:10 PM
Lets say #1 has finished completely
j

josh

03/23/2020, 3:18 PM
Ah I misunderstood at first. Yeah if the input for #3 is
abc
then it should pull the GCS URI for the pickled
applebananacarrot
c

Cab Maddux

03/23/2020, 3:33 PM
OK cool, sounds good thanks! What are limits on cached values? If I set cache_for to a very large value and ran the flow many times with different inputs when would we start running into problems?
j

josh

03/23/2020, 3:38 PM
I don’t believe that you would run into any “problems” besides making sure that your GCS bucket can hold all of your cached values 🙂
c

Cab Maddux

03/23/2020, 7:50 PM
OK sounds good thanks! 👍
@josh one more quick question. It looks like if I run a flow via prefect cloud using the k8s agent, a flow job is created and caching via prefect cloud runs properly. However, if I use a KubernetesJobEnvironment which means the job created by the agent creates a secondary job (because I need my flow to run with a custom k8s config), I lose caching. Is it possible this setup leads to in memory caching in the KubernetesJobEnvironment rather than prefect cloud caching?
j

josh

03/24/2020, 8:54 PM
Why is it that it loses caching? If the YAML provided to the KubernetesJobEnvironment (or baked into your Flow’s Docker storage) is authenticated to your GCS bucket it should still work
c

Cab Maddux

03/24/2020, 8:58 PM
Hmm definitely authenticated to GCS and I see results as cached once completed successfully, but subsequent runs never check the cache
j

josh

03/24/2020, 9:05 PM
Interesting! It could be that it is somehow switching to memory based caching but I’m trying to find where that would happen since it goes off of the state in Cloud. When you say that you see the results as cached does that mean you can see them in the bucket and the state in cloud has the URI?
c

Cab Maddux

03/24/2020, 9:08 PM
Actually looks like maybe checkpointing is turned needs to be set to true in the KubernetesJobEnvironment pod? Here's a screenshot of logs of a task run via prefect cloud for a flow that uses KubernetesJobEnvironment
And then here's logs for the same flow, same task, same inputs, etc. Only change is that I removed the KubernetesJobEnvironment (flow runs in job created from the agent)
logs.txt
j

josh

03/24/2020, 9:10 PM
On your k8s yaml for the environment could you set and see what happens
PREFECT__FLOWS__CHECKPOINTING="true"
c

Cab Maddux

03/24/2020, 9:12 PM
Yeah that must be it since result handlers aren't being used. But worried if that's the case I'm going to run into this problem:
Which I think would mean caching basically won't work (the cache will look empty on each run of the flow)
j

josh

03/24/2020, 9:13 PM
That shouldn’t be the case when running in the context of Cloud (using the CloudTaskRunner which these runs use)
But I’m thinking that the caching just isn’t happening with this turned off maybe
c

Cab Maddux

03/25/2020, 1:25 AM
Yeah still nothing with k8s config containing CHECKPOINTING env var. I'll keep looking into it
Actually, looking like caching and result handlers are fine. Issue is that no DEBUG level logs are making it to prefect cloud when the flow is run with KubernetesJobEnvironment so I can't see the logs about caching
INFO level logs and above are fine
j

josh

03/25/2020, 3:30 AM
Ah! Try setting this on your yaml:
- name: PREFECT__LOGGING__LEVEL
  value: "DEBUG"
c

Cab Maddux

03/25/2020, 3:32 AM
Yup! Should be good with that I think. Thanks!
Hey @josh one last question in this thread: I have a bunch of tasks that write a file locally and name that file with the md5 hash of the file contents. I have a custom result handler I can associate with those tasks which is basically: write(result): upload file at task result local file path to GCS, return GCS URI read(uri): download from GCS URI to local file path, return local file path My issue is when I have 2 tasks back to back that use this result handler and both tasks are setup with caching based on all inputs cache validator. In order to validate inputs for the child task, for each potential cached state, the read method of the result handler for the parent task is called. This leads to downloading from GCS just to validate the returned local file path (which I could return without downloading if able to distinguish between a read call to return a cached output and a read call to validate an input for a child task). My workaround right now is to put a passthrough task between these two with a regular JSONResultHandler that simply returns the argument it receives (
def passthrough(val): return val
), but wondering what pattern you would suggest here (that way rather than downloading a file, I'm only deserializing JSON to check inputs for all cached states)
j

josh

03/25/2020, 7:28 PM
Oh that’s a fun one! I don’t have a best practice for you on that one but would you be willing to open a possible feature request on the repo with this? There are some result changes in the works in reference to PIN-16 and this feels like it could either use some improvement or be documented better
c

Cab Maddux

03/25/2020, 8:11 PM
Yeah 😒imple_smile: bit of a square peg-round hole problem! I'll open a feature request in the repo. Thanks for all of the help 🙏