Danny Vilela
02/24/2021, 9:15 PM[2021-02-24 12:14:55-0800] WARNING - prefect.TaskRunner | Task 'MyTask': Can't use cache because it is now invalid
.
I am using a custom Result
subclass I wrote for caching PySpark DataFrames, which could be the issue. Here’s how I initialize and run the task:
my_task: MyTask = MyTask(
checkpoint=True,
cache_for=dt.timedelta(hours=12),
result=HdfsDataFrameResult(spark=spark, location=hdfs_path_for_this_task),
)
By my understanding, this should cache the output of my_task(…)
for 12 hours. So even if I restart the Python process (say, if I’m developing a flow within a notebook) I can restart the kernel as much as I’d like and still have that task access the cache….right? Am I missing something? Do I need a cache_key
here to share the same cache (here, HDFS) between different flows?josh
02/24/2021, 9:19 PMDanny Vilela
02/24/2021, 9:20 PMjosh
02/24/2021, 9:22 PMCached
state (code that does this check is here). Running with a backend API will alleviate this however you may be able to achieve what you are after by using targets
https://docs.prefect.io/core/concepts/persistence.html#output-caching-based-on-a-file-target
https://docs.prefect.io/core/idioms/targets.html
Targets will always check for the existence of data regardless of previous state. Danny Vilela
02/24/2021, 9:37 PMResult
)?josh
02/24/2021, 9:38 PMDanny Vilela
02/24/2021, 10:15 PMResult
specific — is that correct? So if my custom HdfsDataFrameResult
class implements exists
and expects HDFS paths, I should be able to pass an HDFS path as target
and have that work out?josh
02/24/2021, 10:16 PM