Hi all, I have the following problem. Currently I ...
# prefect-community
f
Hi all, I have the following problem. Currently I have two tasks defined similar to the following:
@task(
    
_name_="Extract",
    
_checkpoint_=True,
    
_result_=LocalResult(cache_dir),
    
_target_="{task_name}--{today}"
)
_def_ extract(x):
   
return 1
@task(
    
_name_="Transform",
    
_checkpoint_=True,
    
_result_=LocalResult(cache_dir),
    
_target_="{task_name}--{today}"
)
_def_ transform(x):
   
return x * 2
Caching works with this example. With each run of the flow, prefect looks for a file "transform--2021-13-01" and if it exists, it uses the cached result. I want to add the following features: 1. eWhenever, I change the source code of theWhenever I change the source code of the
extract
task (say to
return 20
instead of
return 1
) I obviously don't want to use the cached result (i.e.
1
) as an input for the next task. Instead I want to make
extract
recompute whenever I change the content of
extract
. How can I do that? 2. When the result of an upstream DAG changes, I want to execute the DAG which follows from that point downwards (although the downstream tasks might also be tasks with a cache). For instance, in this example, I want
transform
to take the new input of 20 and compute again with that input (although the result is already cached).
j
Hi @Felix Schran 1. So the context values you are setting means the cached result will be used every time the task name is the same and it is the same day. I recommend looking into other context values that you can use for this, notably the
flow_id
(will cache once for each new version of the flow) 2. The DAG will recompute downwards if it is not cached, so since you are still using task_name + today as the target on your transform task it won’t run again until the task name is different or the day has changed. Perhaps look into using
flow_run_id
which will allow you to cache results for a particular run and if you restart it will reuse the cached results but won’t have a bearing on future runs
f
Unfortunately, I am not using prefect cloud, so
flow_id
just seems to be the name of the flow and is not really solving the problem. Would it somehow be possible to hash the source code of a task and use that hash as a context value? Whenevery I changed something in the code, the hash would then change as well?
j
Yeah you could attempt something like that. Outside of using a backend to orchestrate the flow you will probably need some manual process like that
f
Regarding, my point 2 from above I found this question on stackoverflow: https://stackoverflow.com/questions/65600488/in-prefect-core-is-there-a-simple-way-to-invalidate-cache-if-input-of-upstream-t I think it would be really valuable to add an option to the flow so that the downwards DAG is execute if the cache of an upstream task has changed.
Any ideas how to do that?
It looks that the cache validators might help here (https://docs.prefect.io/api/latest/engine/cache_validators.html#functions). But they seem to only work when
cache_for
is also used as an argument. Any chance to get them also working with the
target
option?