emre

    emre

    1 year ago
    Hey guys, im trying to set up caching for a project i am running locally, with a local prefect server instance. For individual tasks cache works fine. but for mapped tasks the following happens: The first task runs, caches its output. The other mapped tasks just assume the first tasks output is a valid cache, and use that. Here is the result object:
    jsonresult = LocalResult(
        location="{flow_id}_{task_full_name}_{flow_run_name}_ignore.json",
        serializer=JSONSerializer(),
    )
    and the task in question: Hey guys, im trying to set up caching for a project i am running locally, with a local prefect server instance. For individual tasks cache works fine. but for mapped tasks the following happens: The first task runs, caches its output. The other mapped tasks just assume the first tasks output is a valid cache, and use that. Here is the result object:
    jsonresult = LocalResult(
        location="{flow_id}_{task_full_name}_{flow_run_name}_ignore.json",
        serializer=JSONSerializer(),
    )
    and the task in question:
    colmatch = MatchColumns(
        result=jsonresult, cache_for=timedelta(days=14)
    ).map(
        table_metadata=some_list
    )
    Any ideas why this would happen, and how I can prevent it?
    Kevin Kho

    Kevin Kho

    1 year ago
    Hi @emre! I think this thread may help you. This is the link mentioned there. Not entirely sure it will work but I think it’s worth a try.
    emre

    emre

    1 year ago
    I see, i believe my settings do produce unique result locations as well.
    {task_full_name}
    includes the map_index in it, and
    {flow_run_name}
    should help distinguish between flow runs, at least in a local development environment. Also, If I disable caching, all tasks run and they produce distinct result files. I believe the problem is these mapped tasks get assigned the same cache key, or something like that. I'll dig around more when I am available.
    Kevin Kho

    Kevin Kho

    1 year ago
    I now know what you are saying. I replicated it with the code below:
    from prefect import task, Flow
    from prefect.engine.serializers import JSONSerializer
    from prefect.engine.results import LocalResult
    import json
    import datetime
    
    jsonresult = LocalResult(
        location="{flow_id}_{task_full_name}_{flow_run_name}_ignore.json",
        serializer=JSONSerializer(),
    )
    
    @task(result=jsonresult, cache_for=datetime.timedelta(hours=1))
    def test(x):
        return json.dumps({"num": x})
    
    with Flow("mapping_results") as flow:
        a = test.map([1,2,3,4,5,6])
    
    flow.register("tutorial")
    I will ask the team about this.
    emre

    emre

    1 year ago
    yeah, this definitely reproduces my issue. I'll write down some notes so I don't forget any in the morning: • Registering a new flow version invalidates the cache, the first mapped task runs and creates a new file. • New flow runs don't invalidate the cache, all mapped tasks use the same cached file from the previous flow.
    btw i found out about
    target
    setting in prefect core, it solved my basic caching needs for local development, without the extra overhead of server. Pretty nice stuff:https://docs.prefect.io/core/concepts/persistence.html#output-caching-based-on-a-file-target
    Kevin Kho

    Kevin Kho

    1 year ago
    Ok so mapping with a cache will not run because it will just use the cache generated by the first task run. You should use it with a cache_validator that checks the inputs into the task as well
    emre

    emre

    1 year ago
    Thanks for following up after all this time! I don't think I understand whats going on right now. I set up
    cache_validator=all_parameters
    : • Without a
    target
    param, the old issue persists. First mapped task runs, others use the result. • With a target param, every mapped task generates its own result, and uses that cached result. However, using target doesn't seem to respect task inputs at all, contrary to the
    cache_validator
    setting.
    Kevin Kho

    Kevin Kho

    1 year ago
    target
    and
    cache_for
    are two separate routes in the code base.
    target
    is based on file existence. Which cache validator are you using? I think you have all_inputs?