Thread
#prefect-community
    c

    Cab Maddux

    2 years ago
    Hi, looking to clarify a few details about persistence and caching. Given Flow
    A
    containing task
    X
    which has input
    arg1
    , takes a long time to run and returns a string. It uses
    GCSResultHandler
    , with
    cache_for=datatime.timedelta(hours=1)
    and
    cache_validator=prefect.engine.cache_validators.all_inputs
    : 1. A run of flow
    A
    is triggered and the input for task
    X
    is
    arg1='abc'
    and the output is
    applebananacarrot
    2. 5 minutes another run of flow
    A
    is triggered and the input for task
    X
    is
    arg1='def'
    and the output is
    danceearflight
    3. 5 minutes later another run of flow
    A
    is triggered and the input for task
    X
    is
    arg1=abc
    After #1 we'll have a cached value for task
    X
    and input
    abc
    pointing to GCS URI storing pickled
    applebananacarrot
    . After #2 we'll have and additional cached value for task
    X
    and input
    def
    pointing to GCS URI storing pickled
    danceearflight
    . Then #3 will pull GCS URI for pickled
    applebananacarrot
    from cache. Is that correct?
    j

    josh

    2 years ago
    In this scenario does #2 happen while #1 is still running and has not completed?
    c

    Cab Maddux

    2 years ago
    Lets say #1 has finished completely
    j

    josh

    2 years ago
    Ah I misunderstood at first. Yeah if the input for #3 is
    abc
    then it should pull the GCS URI for the pickled
    applebananacarrot
    c

    Cab Maddux

    2 years ago
    OK cool, sounds good thanks! What are limits on cached values? If I set cache_for to a very large value and ran the flow many times with different inputs when would we start running into problems?
    j

    josh

    2 years ago
    I don’t believe that you would run into any “problems” besides making sure that your GCS bucket can hold all of your cached values 🙂
    c

    Cab Maddux

    2 years ago
    OK sounds good thanks! 👍
    @josh one more quick question. It looks like if I run a flow via prefect cloud using the k8s agent, a flow job is created and caching via prefect cloud runs properly. However, if I use a KubernetesJobEnvironment which means the job created by the agent creates a secondary job (because I need my flow to run with a custom k8s config), I lose caching. Is it possible this setup leads to in memory caching in the KubernetesJobEnvironment rather than prefect cloud caching?
    j

    josh

    2 years ago
    Why is it that it loses caching? If the YAML provided to the KubernetesJobEnvironment (or baked into your Flow’s Docker storage) is authenticated to your GCS bucket it should still work
    c

    Cab Maddux

    2 years ago
    Hmm definitely authenticated to GCS and I see results as cached once completed successfully, but subsequent runs never check the cache
    j

    josh

    2 years ago
    Interesting! It could be that it is somehow switching to memory based caching but I’m trying to find where that would happen since it goes off of the state in Cloud. When you say that you see the results as cached does that mean you can see them in the bucket and the state in cloud has the URI?
    c

    Cab Maddux

    2 years ago
    Actually looks like maybe checkpointing is turned needs to be set to true in the KubernetesJobEnvironment pod? Here's a screenshot of logs of a task run via prefect cloud for a flow that uses KubernetesJobEnvironment
    And then here's logs for the same flow, same task, same inputs, etc. Only change is that I removed the KubernetesJobEnvironment (flow runs in job created from the agent)
    j

    josh

    2 years ago
    On your k8s yaml for the environment could you set and see what happens
    PREFECT__FLOWS__CHECKPOINTING="true"
    c

    Cab Maddux

    2 years ago
    Yeah that must be it since result handlers aren't being used. But worried if that's the case I'm going to run into this problem:
    Which I think would mean caching basically won't work (the cache will look empty on each run of the flow)
    j

    josh

    2 years ago
    That shouldn’t be the case when running in the context of Cloud (using the CloudTaskRunner which these runs use)
    But I’m thinking that the caching just isn’t happening with this turned off maybe
    c

    Cab Maddux

    2 years ago
    Yeah still nothing with k8s config containing CHECKPOINTING env var. I'll keep looking into it
    Actually, looking like caching and result handlers are fine. Issue is that no DEBUG level logs are making it to prefect cloud when the flow is run with KubernetesJobEnvironment so I can't see the logs about caching
    INFO level logs and above are fine
    j

    josh

    2 years ago
    Ah! Try setting this on your yaml:
    - name: PREFECT__LOGGING__LEVEL
      value: "DEBUG"
    c

    Cab Maddux

    2 years ago
    Yup! Should be good with that I think. Thanks!
    Hey @josh one last question in this thread: I have a bunch of tasks that write a file locally and name that file with the md5 hash of the file contents. I have a custom result handler I can associate with those tasks which is basically: write(result): upload file at task result local file path to GCS, return GCS URI read(uri): download from GCS URI to local file path, return local file path My issue is when I have 2 tasks back to back that use this result handler and both tasks are setup with caching based on all inputs cache validator. In order to validate inputs for the child task, for each potential cached state, the read method of the result handler for the parent task is called. This leads to downloading from GCS just to validate the returned local file path (which I could return without downloading if able to distinguish between a read call to return a cached output and a read call to validate an input for a child task). My workaround right now is to put a passthrough task between these two with a regular JSONResultHandler that simply returns the argument it receives (
    def passthrough(val): return val
    ), but wondering what pattern you would suggest here (that way rather than downloading a file, I'm only deserializing JSON to check inputs for all cached states)
    j

    josh

    2 years ago
    Oh that’s a fun one! I don’t have a best practice for you on that one but would you be willing to open a possible feature request on the repo with this? There are some result changes in the works in reference to PIN-16 and this feels like it could either use some improvement or be documented better
    c

    Cab Maddux

    2 years ago
    Yeah 😒imple_smile: bit of a square peg-round hole problem! I'll open a feature request in the repo. Thanks for all of the help 🙏