https://prefect.io logo
d

Danny Vilela

02/24/2021, 9:15 PM
Hi all. I’m trying to use task result caching in a flow and I’m struggling to understand how prefect actually does cache retrieval. I’m seeing the error:
[2021-02-24 12:14:55-0800] WARNING - prefect.TaskRunner | Task 'MyTask': Can't use cache because it is now invalid
. I am using a custom
Result
subclass I wrote for caching PySpark DataFrames, which could be the issue. Here’s how I initialize and run the task:
Copy code
my_task: MyTask = MyTask(
    checkpoint=True,
    cache_for=dt.timedelta(hours=12),
    result=HdfsDataFrameResult(spark=spark, location=hdfs_path_for_this_task),
)
By my understanding, this should cache the output of
my_task(…)
for 12 hours. So even if I restart the Python process (say, if I’m developing a flow within a notebook) I can restart the kernel as much as I’d like and still have that task access the cache….right? Am I missing something? Do I need a
cache_key
here to share the same cache (here, HDFS) between different flows?
j

josh

02/24/2021, 9:19 PM
@Danny Vilela are you running the flow manually or with a backend API (server/cloud)?
d

Danny Vilela

02/24/2021, 9:20 PM
Hi @josh! I’m running this manually and locally.
j

josh

02/24/2021, 9:22 PM
So when running it manually the flow run has no record of previous states (because they aren’t persisted anywhere). What is happening is that each time you run it it starts up again as if it is a new task run and is not in a previous
Cached
state (code that does this check is here). Running with a backend API will alleviate this however you may be able to achieve what you are after by using targets https://docs.prefect.io/core/concepts/persistence.html#output-caching-based-on-a-file-target https://docs.prefect.io/core/idioms/targets.html Targets will always check for the existence of data regardless of previous state. A cache key may also be able to help you here but I cannot say for certain (doesn’t hurt to give it a test though!) Edit: I don’t think it will because there won’t be any caches in context
d

Danny Vilela

02/24/2021, 9:37 PM
So to be clear, if you’re running Prefect locally you can’t use the caching functionality between flow runs in separate processes, even if you know how to recover a previous task’s results (i.e., via the custom
Result
)?
j

josh

02/24/2021, 9:38 PM
Correct, that will require some persistence of state between runs. Targets should help you achieve what you’re after though
d

Danny Vilela

02/24/2021, 10:15 PM
Ah, gotcha. From the docs, it isn’t super clear how to specify a target not belonging to the local filesystem. For example, I’m not sure how to provide an HDFS path as a target. It seems like the Result Types call-out says this is
Result
specific — is that correct? So if my custom
HdfsDataFrameResult
class implements
exists
and expects HDFS paths, I should be able to pass an HDFS path as
target
and have that work out?
j

josh

02/24/2021, 10:16 PM
Yeah that is how it should work!
2 Views