Yanghui Ou
10/19/2020, 11:06 PMfrom prefect.engine.results import LocalResult
from prefect import task, Flow
@task(target="add_target.txt", checkpoint=True, result=LocalResult(dir="."))
def add(x, y=1):
ret = x + y
return ret
@task
def print_value( x ):
print( x )
with Flow("my handled flow!", result=LocalResult()) as flow:
first_result = add(1, y=2)
second_result = add(x=first_result, y=100)
print_value( second_result )
flow.run()
The add
task only runs once and the final result is always 3. I read the code about LocalResult
a bit and it seems to me that LocalResult.exists
simply checks whether the target file exists. If so it will cause the task to enter cached state. So how can I get this example to work?
Another issue is that even if I just have one add
task in the flow, after modifying the source code to change its input, it still enters cached state. How can I make the result handler to check the task inputs as well?Chris White
target
and instead use the cache_for
and cache_validator
settings on your task.Yanghui Ou
10/20/2020, 1:07 AMcache_for
and cache_validator
sounds very relevant! I’ll definitely check them out!Chris White
target
can include Python string formatting expressions that reference either:
- variables from context (all variables can be found here: https://docs.prefect.io/api/latest/utilities/context.html)
- flow-level parameter names
- inputs to your task
Using your example above, you could do:
@task(target="add_target_{x}_{y}.txt", checkpoint=True, result=LocalResult(dir="."))
def add(x, y=1):
ret = x + y
return ret
so that the file name references the input values that were provided. Note that this assumes the inputs have reasonable string representations! IF they don’t, you could also pass a fully custom callable function as a target:
def my_target(**kwargs):
# custom logic based on kwargs['x'] and `kwargs['y]
return "some-string-that-represents-the-desired-filepath"
Apologies if this is information overload, but hopefully some piece of it helps you achieve your goal!Yanghui Ou
10/20/2020, 1:37 AMcache_for
and cache_validator
).Krzysztof Nawara
10/20/2020, 9:02 PMChris White
Marvin
10/20/2020, 9:46 PMYanghui Ou
10/21/2020, 10:01 PMcache_validator
but it does not solve my problem. I tried
@task(
cache_for=timedelta(days=1),
cache_validator=all_inputs,
target="add_target.txt",
checkpoint=True,
result=LocalResult(dir="."),
)
def add(x, y=1):
ret = x + y
return ret
which doesn’t stop the task from entering “Cached” state after seeing add_target.txt
still exists.
Removing the target
does yield the correct result:
@task(
cache_for=timedelta(days=1),
cache_validator=all_inputs,
)
def add(x, y=1):
ret = x + y
return ret
However, it seems to me that it is using some in-memory cache? I think I really need to use persisted results for my project. What can I do to make it work? Can I somehow access the task inputs/parameters in Result.exists
method? Seems not?Chris White
Yanghui Ou
10/21/2020, 10:33 PMResult
subclass but it seems that I am not able to detect whether the inputs are changed. Well I guess I can do some really hacky things like encoding all the inputs into the target file name and decode it in the result handler. It would be nice if there is a cleaner solution.