emre
01/05/2021, 1:44 PMmeta_df = SnowflakePandasResultTask(
db=SNOW_DB,
checkpoint=True,
result=LocalResult(dir=".prefect_cache"),
cache_for=timedelta(days=14),
cache_key="snow_pandas_out",
)(query=info_query)
This persist files with arbitrary names under .prefect_cache
. On every run I get a warning that my cache is not valid anymore, Can anyone point me to where I am doing things wrong?Chris White
emre
01/05/2021, 4:09 PMChris White
flow.run
alone, the storage of all previous cached runs occurs in memory; this means that if you call this from new processes they have no way of sharing information.
However, there is a relatively simple workaround: all cached states from all tasks are stored in prefect.context.caches
so if you save this after each run and load it before each run, it should start behaving as you expect. Something like:
with open(".prefect_cache/THE_CACHE.pkl", "wb") as f:
cloudpickle.dump(prefect.context.caches, f)
# on load
with open(".prefect_cache/THE_CACHE.pkl", "rb") as f:
the_cache = cloudpickle.load(f)
prefect.context.update(caches=the_cache)
emre
01/05/2021, 4:41 PMChris White
emre
01/05/2021, 5:12 PMBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by