Hey guys, im trying to set up caching for a projec...
# ask-community
e
Hey guys, im trying to set up caching for a project i am running locally, with a local prefect server instance. For individual tasks cache works fine. but for mapped tasks the following happens: The first task runs, caches its output. The other mapped tasks just assume the first tasks output is a valid cache, and use that. Here is the result object:
Copy code
jsonresult = LocalResult(
    location="{flow_id}_{task_full_name}_{flow_run_name}_ignore.json",
    serializer=JSONSerializer(),
)
and the task in question: Hey guys, im trying to set up caching for a project i am running locally, with a local prefect server instance. For individual tasks cache works fine. but for mapped tasks the following happens: The first task runs, caches its output. The other mapped tasks just assume the first tasks output is a valid cache, and use that. Here is the result object:
Copy code
jsonresult = LocalResult(
    location="{flow_id}_{task_full_name}_{flow_run_name}_ignore.json",
    serializer=JSONSerializer(),
)
and the task in question:
Copy code
colmatch = MatchColumns(
    result=jsonresult, cache_for=timedelta(days=14)
).map(
    table_metadata=some_list
)
Any ideas why this would happen, and how I can prevent it?
k
Hi @emre! I think this thread may help you. This is the link mentioned there. Not entirely sure it will work but I think it’s worth a try.
e
I see, i believe my settings do produce unique result locations as well.
{task_full_name}
includes the map_index in it, and
{flow_run_name}
should help distinguish between flow runs, at least in a local development environment. Also, If I disable caching, all tasks run and they produce distinct result files. I believe the problem is these mapped tasks get assigned the same cache key, or something like that. I'll dig around more when I am available.
k
I now know what you are saying. I replicated it with the code below:
Copy code
from prefect import task, Flow
from prefect.engine.serializers import JSONSerializer
from prefect.engine.results import LocalResult
import json
import datetime

jsonresult = LocalResult(
    location="{flow_id}_{task_full_name}_{flow_run_name}_ignore.json",
    serializer=JSONSerializer(),
)

@task(result=jsonresult, cache_for=datetime.timedelta(hours=1))
def test(x):
    return json.dumps({"num": x})

with Flow("mapping_results") as flow:
    a = test.map([1,2,3,4,5,6])

flow.register("tutorial")
I will ask the team about this.
e
yeah, this definitely reproduces my issue. I'll write down some notes so I don't forget any in the morning: • Registering a new flow version invalidates the cache, the first mapped task runs and creates a new file. • New flow runs don't invalidate the cache, all mapped tasks use the same cached file from the previous flow.
btw i found out about
target
setting in prefect core, it solved my basic caching needs for local development, without the extra overhead of server. Pretty nice stuff: https://docs.prefect.io/core/concepts/persistence.html#output-caching-based-on-a-file-target
👍 1
k
Ok so mapping with a cache will not run because it will just use the cache generated by the first task run. You should use it with a cache_validator that checks the inputs into the task as well
e
Thanks for following up after all this time! I don't think I understand whats going on right now. I set up
cache_validator=all_parameters
: • Without a
target
param, the old issue persists. First mapped task runs, others use the result. • With a target param, every mapped task generates its own result, and uses that cached result. However, using target doesn't seem to respect task inputs at all, contrary to the
cache_validator
setting.
k
target
and
cache_for
are two separate routes in the code base.
target
is based on file existence. Which cache validator are you using? I think you have all_inputs?