Hey there! I am trying to set up checkpointing / r...
# ask-community
m
Hey there! I am trying to set up checkpointing / result saving / output caching, and it seems everything is now working correctly when I run flows locally (I set
*prefect*.config.flows.checkpointing = True
somewhere in the code). But I can’t for the life of me get it to work when I use my Docker agent that’s connected to Prefect cloud. I set
--env PREFECT__FLOWS__CHECKPOINTING=true
as an env variable when I run the agent, but it never uses any checkpointing logic. Any ideas here? I’m more than happy to provide more info as needed
BTW I can tell it isn’t working because when I run it locally, a bunch of tasks succeed with final state
'Cached'
, whereas that doesn’t happen with the remote
k
Hey @Michael , was still thinking about the earlier msg btw. When running locally, the
flows.checkpointing
is respected, but when running with a backend (Cloud or Server)l, Prefect automatically sets it as True for you so this should work. Target is a form of caching so it makes sense they are cached on local. I am wondering if the
target
exists on the Docker container
Running the flow with DEBUG level logs gives you more info around this btw, but I think this should work.
Can you go into the container and check if the files exist?
m
Hmm. The target I’m using is a path in GCS (I use the GCS result), and the docker agent definitely has auth access to gcloud (no errors are thrown). Honestly to me it’s behaving as if checkpointing = False (which is super weird, because I did see that it’s always on with Prefect Cloud). I can run with DEBUG log levels to see if that reveals anything though
As for the earlier message, that at least I managed to work out! I’d happily post a reply explaining for others, though I’m not sure how many would find it useful
k
Can you respond there and I can archive with Marvin?
👍 1
m
This is what I see, basically nothing extra with DEBUG logging
It just enters the task run, seemingly without checking the target at all, which is totally not what happens when I run locally
k
Can I see how you set up your target?
m
It’s pretty convoluted and has some domain logic right now, but TLDR; it’s a callable. I was wondering if that’s the issue (if someone at runtime in the agent environment it doesn’t know about the callable). It’s something like this:
Copy code
def default_result_location(flow_name, task_full_name, parameters, **kwargs):
    all_params = [str(v) for _, v in parameters.items()]
    return f"{flow_name}/{task_full_name}/{cross_session_deterministic_hash('-'.join(all_params))}"
I only saw this in the docstrings in the Result & Task codebase actually, not documented online (the fact that you can specify these paths as callables that receive the runtime context as kwargs)
k
I am positive it can be a callable based on this
Wow man you’re diving deep into the code! Lol. There is a doc on this one though I think ^
🙏 1
m
😅 so I pass default_result_location as both the Task
target
and the Result
location
(& BTW this works amazingly when I run locally, I was super impressed. Sped up the caching process a lot compared to implementation inside the tasks themselves, so just have to get it sorted in the Cloud).
k
I think the result will use the target already. When you run it , do you see anything on GCS? Or nothing at all happens?
m
It’s super confusing, it’s just behaving exactly as if
checkpointing=False
across the board. I can delete the cached files in GCS and try again to see if they are created (so far I was trying to see whether it used the cache that I had created by running locally). I will try now from scratch in the cloud with no saved results
k
Yeah that would help us see if there is an issue
m
It doesn’t touch GCS. It is just ignoring the results completely
k
Can I see your task decorator with the result and target?
m
I have two functions that help with generating these:
Copy code
def create_result_location(task_args: List[str]) -> Callable:
    def result_location(flow_name, task_full_name, **kwargs):
        task_arg_vals = []
        for task_arg_name in task_args:
            task_arg_val = kwargs[task_arg_name]
            if isinstance(task_arg_val, List):
                task_arg_vals.append("|".join(task_arg_val))
            else:
                task_arg_vals.append(str(task_arg_val))
        return f"{flow_name}/{task_full_name}/{cross_session_deterministic_hash('-'.join(task_arg_vals))}"

    return result_location


def create_result_config(result_location: Callable) -> Result:
    return GCSResult(bucket=RESULT_BUCKET, location=result_location)
and then I use these two functions with every task (I use subclasses not decorators) as follows:
Copy code
result_location = create_result_location(MyTask.DEPENDENCY_ARGS)
my_task = MyTask(
    name="my_task",
    target=result_location,
    result=create_result_config(result_location),
    checkpoint=True,
)
my_task.set_upstream(dependency_arg_1, key="dependency_arg_1")
...
MyTask.DEPENDENCY_ARGS
is just a list of strings specifying the names of the parameters that should uniquely determine the caching path for that particular task (i.e. allowing me to take a subset of the full
parameters
dict, the question I was wondering about in a separate thread). EDIT: I just tested using a single function for all tasks (i.e. no function generating the callable that is passed to the Task constructor, but just one simple callable passed to each). This still doesn’t work when I try to run my Flow in the Cloud 🙃. Is it some issue with being on the free tier w Prefect Cloud??
BTW, you said above “_I think the result will use the target already_”, I just wanted to confirm that tasks do need the target in order for output caching to work. You don’t need it to save a result, but it is needed to prevent task execution if a cache exists (although as you can see from my code example, it is a little redundant). It would be cool if there was an arg to the task constructor or something like that so I don’t have to pass the target/location path in two places
k
I think the syntax is to pass the callable directly. I think the
result_location = create_result_location(MyTask.DEPENDENCY_ARGS)
is creating the value during build time but you want it to be dynamic in runtime so you should pass the function instead.
No this is not a free tier thing lol. This is in Prefect Core so it should be the same on Cloud and Server.
You can do Result +
cache_for
and
cache_validator
or you can use
target
. From the docs
So you don’t need to pass it in both places.
When you go to the UI, can you see the Result associated with that task and see the location?
How do you get the
flow_name
and
task_full_name
to pass in to that inner function?