Danny Vilela

    Danny Vilela

    1 year ago
    Hi all. I’m trying to use task result caching in a flow and I’m struggling to understand how prefect actually does cache retrieval. I’m seeing the error:
    [2021-02-24 12:14:55-0800] WARNING - prefect.TaskRunner | Task 'MyTask': Can't use cache because it is now invalid
    . I am using a custom
    Result
    subclass I wrote for caching PySpark DataFrames, which could be the issue. Here’s how I initialize and run the task:
    my_task: MyTask = MyTask(
        checkpoint=True,
        cache_for=dt.timedelta(hours=12),
        result=HdfsDataFrameResult(spark=spark, location=hdfs_path_for_this_task),
    )
    By my understanding, this should cache the output of
    my_task(…)
    for 12 hours. So even if I restart the Python process (say, if I’m developing a flow within a notebook) I can restart the kernel as much as I’d like and still have that task access the cache….right? Am I missing something? Do I need a
    cache_key
    here to share the same cache (here, HDFS) between different flows?
    j

    josh

    1 year ago
    @Danny Vilela are you running the flow manually or with a backend API (server/cloud)?
    Danny Vilela

    Danny Vilela

    1 year ago
    Hi @josh! I’m running this manually and locally.
    j

    josh

    1 year ago
    So when running it manually the flow run has no record of previous states (because they aren’t persisted anywhere). What is happening is that each time you run it it starts up again as if it is a new task run and is not in a previous
    Cached
    state (code that does this check is here). Running with a backend API will alleviate this however you may be able to achieve what you are after by using targetshttps://docs.prefect.io/core/concepts/persistence.html#output-caching-based-on-a-file-target https://docs.prefect.io/core/idioms/targets.html Targets will always check for the existence of data regardless of previous state. A cache key may also be able to help you here but I cannot say for certain (doesn’t hurt to give it a test though!) Edit: I don’t think it will because there won’t be any caches in context
    Danny Vilela

    Danny Vilela

    1 year ago
    So to be clear, if you’re running Prefect locally you can’t use the caching functionality between flow runs in separate processes, even if you know how to recover a previous task’s results (i.e., via the custom
    Result
    )?
    j

    josh

    1 year ago
    Correct, that will require some persistence of state between runs. Targets should help you achieve what you’re after though
    Danny Vilela

    Danny Vilela

    1 year ago
    Ah, gotcha. From the docs, it isn’t super clear how to specify a target not belonging to the local filesystem. For example, I’m not sure how to provide an HDFS path as a target. It seems like the Result Types call-out says this is
    Result
    specific — is that correct? So if my custom
    HdfsDataFrameResult
    class implements
    exists
    and expects HDFS paths, I should be able to pass an HDFS path as
    target
    and have that work out?
    j

    josh

    1 year ago
    Yeah that is how it should work!