Felix Schran

    Felix Schran

    1 year ago
    Hi all, I have the following problem. Currently I have two tasks defined similar to the following:
    @task(
        
    _name_="Extract",
        
    _checkpoint_=True,
        
    _result_=LocalResult(cache_dir),
        
    _target_="{task_name}--{today}"
    )
    _def_ extract(x):
       
    return 1
    @task(
        
    _name_="Transform",
        
    _checkpoint_=True,
        
    _result_=LocalResult(cache_dir),
        
    _target_="{task_name}--{today}"
    )
    _def_ transform(x):
       
    return x * 2
    Caching works with this example. With each run of the flow, prefect looks for a file "transform--2021-13-01" and if it exists, it uses the cached result. I want to add the following features:1. eWhenever, I change the source code of theWhenever I change the source code of the
    extract
    task (say to
    return 20
    instead of
    return 1
    ) I obviously don't want to use the cached result (i.e.
    1
    ) as an input for the next task. Instead I want to make
    extract
    recompute whenever I change the content of
    extract
    . How can I do that? 2. When the result of an upstream DAG changes, I want to execute the DAG which follows from that point downwards (although the downstream tasks might also be tasks with a cache). For instance, in this example, I want
    transform
    to take the new input of 20 and compute again with that input (although the result is already cached).
    j

    josh

    1 year ago
    Hi @Felix Schran 1. So the context values you are setting means the cached result will be used every time the task name is the same and it is the same day. I recommend looking into other context values that you can use for this, notably the
    flow_id
    (will cache once for each new version of the flow) 2. The DAG will recompute downwards if it is not cached, so since you are still using task_name + today as the target on your transform task it won’t run again until the task name is different or the day has changed. Perhaps look into using
    flow_run_id
    which will allow you to cache results for a particular run and if you restart it will reuse the cached results but won’t have a bearing on future runs
    Felix Schran

    Felix Schran

    1 year ago
    Unfortunately, I am not using prefect cloud, so
    flow_id
    just seems to be the name of the flow and is not really solving the problem. Would it somehow be possible to hash the source code of a task and use that hash as a context value? Whenevery I changed something in the code, the hash would then change as well?
    j

    josh

    1 year ago
    Yeah you could attempt something like that. Outside of using a backend to orchestrate the flow you will probably need some manual process like that
    Felix Schran

    Felix Schran

    1 year ago
    Regarding, my point 2 from above I found this question on stackoverflow: https://stackoverflow.com/questions/65600488/in-prefect-core-is-there-a-simple-way-to-invalidate-cache-if-input-of-upstream-t I think it would be really valuable to add an option to the flow so that the downwards DAG is execute if the cache of an upstream task has changed.
    Any ideas how to do that?
    It looks that the cache validators might help here (https://docs.prefect.io/api/latest/engine/cache_validators.html#functions). But they seem to only work when
    cache_for
    is also used as an argument. Any chance to get them also working with the
    target
    option?