emre
01/05/2021, 1:44 PMmeta_df = SnowflakePandasResultTask(
db=SNOW_DB,
checkpoint=True,
result=LocalResult(dir=".prefect_cache"),
cache_for=timedelta(days=14),
cache_key="snow_pandas_out",
)(query=info_query)
This persist files with arbitrary names under .prefect_cache
. On every run I get a warning that my cache is not valid anymore, Can anyone point me to where I am doing things wrong?Chris White
emre
01/05/2021, 4:09 PMChris White
flow.run
alone, the storage of all previous cached runs occurs in memory; this means that if you call this from new processes they have no way of sharing information.
However, there is a relatively simple workaround: all cached states from all tasks are stored in prefect.context.caches
so if you save this after each run and load it before each run, it should start behaving as you expect. Something like:
with open(".prefect_cache/THE_CACHE.pkl", "wb") as f:
cloudpickle.dump(prefect.context.caches, f)
# on load
with open(".prefect_cache/THE_CACHE.pkl", "rb") as f:
the_cache = cloudpickle.load(f)
prefect.context.update(caches=the_cache)
emre
01/05/2021, 4:41 PMChris White
emre
01/05/2021, 5:12 PM