https://prefect.io logo
Title
g

Guillaume Latour

05/24/2022, 6:57 AM
Hello, I have an issue with the intermediate results of the tasks of a flow taking too much space. My setup is like this: • 1 prefect server (launched with docker-compose) and 1 (local) prefect agent on the same server (sharing the ~/.prefect/config.toml configuration) • 1 dask cluster (launched with docker on another server) with the needed python libraries for the flows to execute correctly, those cluster do not have any prefect configuration So obviously I am using a DaskExecutor which is correctly configured in this ~/.prefect/config.toml file. What I see is that the dockers create a
/home/<user>/.prefect/results
folder with
<user>
beeing the user launching the prefect server & agent on the other server (and who are not present in the docker) I've added this line into the configuration:
flows.checkpointing = false
, relaunched the server and agent but nothing has changed: the results folder is still beeing filled with intermediate tasks results. Am I doing something wrong? Is it possible to prevent this intermediate backups? Ty in advance
a

Anna Geller

05/24/2022, 11:14 AM
Is it possible to prevent this intermediate backups?
Yes, there is! It would involve some work though as you would need to set it on each task, but setting checkpoint=False on your task decorator will prevent that
g

Guillaume Latour

05/24/2022, 12:54 PM
arg, I was really hoping for something else 😢 ok let's do that and see what's coming out of it
a

Anna Geller

05/24/2022, 12:59 PM
I totally feel your pain here 😄 maybe you can build a custom task decorator?
I've found this - @Kevin Kho could you add a usage example here e.g. to show how one could use this decorator to set checkpoint=False automatically?
g

Guillaume Latour

05/24/2022, 1:19 PM
yeah ofc, but I still need to update all files for a different import 🙂
a

Anna Geller

05/24/2022, 1:51 PM
that's true - still pain
fwiw I discussed this problem with the product team to handle such results config easier in 2.0 🤞
🙌 1
k

Kevin Kho

05/24/2022, 2:24 PM
Yeah unfortunately this is hardcoded by the backend so I custom decorator is the best thing you can do or add the
checkpoint=False
g

Guillaume Latour

05/26/2022, 2:03 PM
ok so I've added this to the code responsible for defining the tasks/flows
from typing import Callable, Dict, Any, Optional

from prefect import task


def checkpoint_task(func: Callable, **task_init_kwargs: Optional[Dict[str, Any]]):
    if task_init_kwargs is None:
        task_init_kwargs = {}
    task_init_kwargs['checkpoint'] = False
    return task(func, **task_init_kwargs)
And I've updated all the tasks import so they look like this
import checkpoint_task as task
I've created a docker image including this new code for the dask cluster And finally I've registered the new defined flows into the prefect server. I am still having the issue on the dask workers: they create a folder in
/home/<user>/.prefect/results
(with
<user>
being the one that is running the prefect agent) Do you have an idea of how I could debug this? Where could be overriden this hardcoded
checkpoint = False
? Is there some caching mechanism preventing the registration of the updated code?
k

Kevin Kho

05/26/2022, 2:28 PM
We don’t hold code, but you do need to re-register the new one I think because the
checkpoint
is part of what is stored in the task. Did you re-register this Flow?
g

Guillaume Latour

05/26/2022, 2:34 PM
yes I did
k

Kevin Kho

05/26/2022, 2:37 PM
Hmm that’s weird. I can’t think of anything immediate here, other than suggesting you try a simple Flow with your custom task function and seeing if it will happens. What is your DaskExecutor setup? Temporary cluster on Kubernetes?
g

Guillaume Latour

05/26/2022, 2:54 PM
I provide a
dask.address = "tcp://<ip>:<port>"
in my
.prefect/config.toml
file I am using the
daskdev/dask:latest
docker image on which I install our custom packages, and I launch a small cluster (1 scheduler and some workers). We do not use k8 yet.
k

Kevin Kho

05/26/2022, 2:56 PM
I am wondering how work happens. I thought the Dask cluster needed prefect installed in the image.