https://prefect.io logo
k

Kivanc Yuksel

02/19/2022, 3:18 PM
Hi! I have some long running tasks that I cache their output with
target
, however, from time to time I want to re-run these tasks without manually deleting target files. Is there a way to "force" re-run for such tasks?
a

Anna Geller

02/19/2022, 4:16 PM
No, there isn't. Targets are file-based persistence so you would need to delete those files manually to rerun the job, or you would need to comment out the targets and rerun the flow without setting targets at all. As an alternative to targets, you could use cache_for to implement time-based caching. This would allow you to influence for how long the task results are cached and when the tasks will be rerun instead of being cached. One last thing you could do is to use a more fine-grained templating - if your targets were specifying daily files, you could change that to e.g. hourly files to force rerunning of those more frequently.
k

Kivanc Yuksel

02/19/2022, 4:32 PM
The task I have depends on some configuration parameters as well as contents of a number of files. I have a special class which generates a unique directory name based on these parameters and contents of depended files and the result of the task is saved under this directory. With this way whenever I run this task with the same parameters, and contents of depended files, I can used the cached results. Do you think that it would be possible to use
cache_for
and
cache_validator
for this purpose to accomplish what I need?
a

Anna Geller

02/19/2022, 10:17 PM
Can you share the flow configuration you have so far?
k

Kivanc Yuksel

02/19/2022, 10:42 PM
Sure, it is something like this:
Copy code
from pathlib import Path

target_location = Path("/path/to/dataset.csv")
file_path1 = Path("/path/to/some/file.txt")
file_path2 = Path("/path/to/some/file.csv")

cache_directory_generator = CacheDirectoryGenerator(
    file_dependencies=[file_path1, file_path2],
    parameter_dependencies={"some_param": 3}
)
target_location = cache_directory_generator.get_cache_path(target_location) 
# target_location is now something like: "/path/to/04f6b63d7b27281742d7bf14e4e8323c/dataset.csv"

serializer = PandasSerializer(file_type="csv")
result = LocalResult(dir=str(target_location.parent), serializer=serializer)

long_running_task = SomeLongRunningTask(
    checkpoint=True,
    result=result,
    target=target_location.name,
    skip_on_upstream_skip=True,
)

with Flow("process-raw-data") as flow:
    output = long_running_task()

flow.run()
a

Anna Geller

02/20/2022, 11:43 AM
So your class in the end returns a string of the path, correct? Do you know that you can use templating and even provide a callable to the target? So there's a lot you can do here wit targets without having to use cache_for actually. You mention you want to cache based on parameters - if you mean based on task inputs, you could use cache validator like so:
Copy code
cache_for=datetime.timedelta(hours=1),
    cache_validator=prefect.engine.cache_validators.all_parameters)
k

Kivanc Yuksel

02/20/2022, 1:21 PM
Thank you for the response. Yes, the purpose of
CacheDirectoryGenerator
is to generate a unique path based on provided parameters and file dependencies. Yes, I know about templating, and the thing I do is actually very similar to that, so, I don't see any additional benefit of using it. My class already handles unique path generation and the resulting
target
is past to the task as is. Also, the problem I have is not that the unique location generator based on parameters and file dependencies doesn't work, it works. I just need a way to somehow "force" re-run, even though the output is cached, without manually deleting cached files, if I need to.
a

Anna Geller

02/20/2022, 1:51 PM
I see, I think using this target setup, you would need to manually delete files. But if you leverage more granular templating, you may avoid that and solve the issue this way because e.g. instead of providing one file per day, you may use templating to generate it hourly to force more frequent rerun (just one example). Also, you may stumble into some issues with your current setup if you use pickle storage because the path you provide may be frozen at registration (not 100% sure)
k

Kivanc Yuksel

02/20/2022, 2:19 PM
It would be really nice if I could avoid manually deleting files, because the folder the cached files resides in is automatically generated based on parameter and other file dependencies. I would need an addition run in order to get which cached files I need to delete. It is possible, but uneasy. In most cases I don't want to rerun the same task if the same parameters and file dependencies are used. I would want to re-run a cached task only if, for example, I discovered and fixed a bug or something. To give a little bit more context, I work on a Machine Learning project in which I need to process big amount of data, then I train some ML model using the processed data. The data is processed based on some parameters, and I need to experiment with many different parameters to find the best performing model. I don't want to re-run the processing task if the parameters I used to process the data hasn't changed, and also I should be able to easily switch between data that is processed with some particular set of parameters. I am not sure if I understand the possible problem I might encounter, could you please give a little bit more details about it? 🤔
a

Anna Geller

02/20/2022, 2:47 PM
thanks for explaining your use case, that's always helpful! In that case, I think the best option would be to move the file name as input arguments to the task and then specify caching based on input parameters.
Copy code
cache_for=datetime.timedelta(hours=1),
    cache_validator=prefect.engine.cache_validators.all_parameters)
This will ensure that the task won't be rerun when the task input parameters (and file paths which now will also be as task inputs) haven't changed. Also, check out this blog post from Kevin - it discusses a very similar use case https://www.prefect.io/blog/introducing-prefect-ml-orchestrate-a-distributed-hyperparameter-grid-search/
e

emre

02/20/2022, 3:09 PM
Maybe you could add a task downstream to path generation, but upstream to your cached task. This new task would delete the cached file if it exists AND if you explicitly want it to be rerun (specify with a flow param maybe?). This way you would keep using
target
, and be able to flush your cache automatically based on user input.
upvote 1
k

Kivanc Yuksel

02/20/2022, 6:00 PM
@Anna Geller thanks for the help, and for the link 🙂 @emre Yes, I thought about this too, however, the problem with this approach is that you need to write custom "erasers" for different type of `Result`s. It is not a big problem, definitely it can be done. I just wanted to be sure that there isn't an easier way of doing it, in order not to "over engineer" stuff. It appears to me right now that there isn't actually an easier way, so I will do it like you suggested, thanks!
🙌 2
4 Views