y

    Yanghui Ou

    1 year ago
    Hi, I am doing some simple experiments with the caching system and ran into some issues. Here’s the code I am running:
    from prefect.engine.results import LocalResult
    from prefect import task, Flow
    
    @task(target="add_target.txt", checkpoint=True, result=LocalResult(dir="."))
    def add(x, y=1):
        ret = x + y
        return ret
    
    @task
    def print_value( x ):
        print( x )
    
    with Flow("my handled flow!", result=LocalResult()) as flow:
        first_result  = add(1, y=2)
        second_result = add(x=first_result, y=100)
        print_value( second_result )
    
    flow.run()
    The
    add
    task only runs once and the final result is always 3. I read the code about
    LocalResult
    a bit and it seems to me that
    LocalResult.exists
    simply checks whether the target file exists. If so it will cause the task to enter cached state. So how can I get this example to work? Another issue is that even if I just have one
    add
    task in the flow, after modifying the source code to change its input, it still enters cached state. How can I make the result handler to check the task inputs as well?
    Chris White

    Chris White

    1 year ago
    Hi @Yanghui Ou check out this doc on caching in Prefect: https://docs.prefect.io/core/concepts/persistence.html If you want to use file-based targets as in your example, then you’ll need to template the location to take things like inputs into account. Otherwise, if you’d rather be more explicit about it, you’ll want to refactor away from using
    target
    and instead use the
    cache_for
    and
    cache_validator
    settings on your task.
    y

    Yanghui Ou

    1 year ago
    Thanks! I’m not sure how I can access the inputs in the template string though.
    cache_for
    and
    cache_validator
    sounds very relevant! I’ll definitely check them out!
    Chris White

    Chris White

    1 year ago
    Awesome, and just so you know for future reference: the string you pass to
    target
    can include Python string formatting expressions that reference either: - variables from context (all variables can be found here: https://docs.prefect.io/api/latest/utilities/context.html)- flow-level parameter names - inputs to your task Using your example above, you could do:
    @task(target="add_target_{x}_{y}.txt", checkpoint=True, result=LocalResult(dir="."))
    def add(x, y=1):
        ret = x + y
        return ret
    so that the file name references the input values that were provided. Note that this assumes the inputs have reasonable string representations! IF they don’t, you could also pass a fully custom callable function as a target:
    def my_target(**kwargs):
        # custom logic based on kwargs['x'] and `kwargs['y]
        return "some-string-that-represents-the-desired-filepath"
    Apologies if this is information overload, but hopefully some piece of it helps you achieve your goal!
    y

    Yanghui Ou

    1 year ago
    Thank you for the informative answer! Yeah I know about the support for string formatting expressions but I didn’t notice that I can directly use the parameter names. Wonder what would happen if the parameter names conflict with the reserved variable names? Anyway I think for my purpose I’ll need to hack deeper into the caching system (i.e. look into
    cache_for
    and
    cache_validator
    ).
    Krzysztof Nawara

    Krzysztof Nawara

    1 year ago
    @Chris White Is it documented somewhere that you can use inputs to template target? I though I read everything I could find about caching/checkpointing but I've never since that's information. And it's an awesome feature
    Chris White

    Chris White

    1 year ago
    Honestly, it might not be! I’ll open an issue so that we can make sure it gets documented properly 🙂 @Marvin open “Explicitly document the variables that can be used in a task target template”
    Marvin

    Marvin

    1 year ago
    y

    Yanghui Ou

    1 year ago
    @Chris White Hi I just tried using
    cache_validator
    but it does not solve my problem. I tried
    @task(
      cache_for=timedelta(days=1),
      cache_validator=all_inputs,
      target="add_target.txt",
      checkpoint=True,
      result=LocalResult(dir="."),
    )
    def add(x, y=1):
        ret = x + y
        return ret
    which doesn’t stop the task from entering “Cached” state after seeing
    add_target.txt
    still exists. Removing the
    target
    does yield the correct result:
    @task(
      cache_for=timedelta(days=1),
      cache_validator=all_inputs,
    )
    def add(x, y=1):
        ret = x + y
        return ret
    However, it seems to me that it is using some in-memory cache? I think I really need to use persisted results for my project. What can I do to make it work? Can I somehow access the task inputs/parameters in
    Result.exists
    method? Seems not?
    Chris White

    Chris White

    1 year ago
    You are overloading the caching mechanism in your first code snippet; targets and cache validators are two separate features (and providing a target takes precedence). This is why your second example works. (Side note: we should raise a warning when both are provided to avoid confusion) For persistence between flow processes, you will need to use a Prefect backend (either Server or Cloud).
    y

    Yanghui Ou

    1 year ago
    Thanks for the reply! Eventually I am trying to build a workflow to replace my current makefile based flow, so the tasks are more file centric (i.e. each task generates some side effect files) and that’s why I am looking into persisted results. I tried implementing my own
    Result
    subclass but it seems that I am not able to detect whether the inputs are changed. Well I guess I can do some really hacky things like encoding all the inputs into the target file name and decode it in the result handler. It would be nice if there is a cleaner solution.