Michael

    Michael

    1 year ago
    Hey there! I am trying to set up checkpointing / result saving / output caching, and it seems everything is now working correctly when I run flows locally (I set
    *prefect*.config.flows.checkpointing = True
    somewhere in the code). But I can’t for the life of me get it to work when I use my Docker agent that’s connected to Prefect cloud. I set
    --env PREFECT__FLOWS__CHECKPOINTING=true
    as an env variable when I run the agent, but it never uses any checkpointing logic. Any ideas here? I’m more than happy to provide more info as needed
    BTW I can tell it isn’t working because when I run it locally, a bunch of tasks succeed with final state
    'Cached'
    , whereas that doesn’t happen with the remote
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Michael , was still thinking about the earlier msg btw. When running locally, the
    flows.checkpointing
    is respected, but when running with a backend (Cloud or Server)l, Prefect automatically sets it as True for you so this should work. Target is a form of caching so it makes sense they are cached on local. I am wondering if the
    target
    exists on the Docker container
    Running the flow with DEBUG level logs gives you more info around this btw, but I think this should work.
    Can you go into the container and check if the files exist?
    Michael

    Michael

    1 year ago
    Hmm. The target I’m using is a path in GCS (I use the GCS result), and the docker agent definitely has auth access to gcloud (no errors are thrown). Honestly to me it’s behaving as if checkpointing = False (which is super weird, because I did see that it’s always on with Prefect Cloud). I can run with DEBUG log levels to see if that reveals anything though
    As for the earlier message, that at least I managed to work out! I’d happily post a reply explaining for others, though I’m not sure how many would find it useful
    Kevin Kho

    Kevin Kho

    1 year ago
    Can you respond there and I can archive with Marvin?
    Michael

    Michael

    1 year ago
    This is what I see, basically nothing extra with DEBUG logging
    It just enters the task run, seemingly without checking the target at all, which is totally not what happens when I run locally
    Kevin Kho

    Kevin Kho

    1 year ago
    Can I see how you set up your target?
    Michael

    Michael

    1 year ago
    It’s pretty convoluted and has some domain logic right now, but TLDR; it’s a callable. I was wondering if that’s the issue (if someone at runtime in the agent environment it doesn’t know about the callable). It’s something like this:
    def default_result_location(flow_name, task_full_name, parameters, **kwargs):
        all_params = [str(v) for _, v in parameters.items()]
        return f"{flow_name}/{task_full_name}/{cross_session_deterministic_hash('-'.join(all_params))}"
    I only saw this in the docstrings in the Result & Task codebase actually, not documented online (the fact that you can specify these paths as callables that receive the runtime context as kwargs)
    Kevin Kho

    Kevin Kho

    1 year ago
    I am positive it can be a callable based on this
    Wow man you’re diving deep into the code! Lol. There is a doc on this one though I think ^
    Michael

    Michael

    1 year ago
    😅 so I pass default_result_location as both the Task
    target
    and the Result
    location
    (& BTW this works amazingly when I run locally, I was super impressed. Sped up the caching process a lot compared to implementation inside the tasks themselves, so just have to get it sorted in the Cloud).
    Kevin Kho

    Kevin Kho

    1 year ago
    I think the result will use the target already. When you run it , do you see anything on GCS? Or nothing at all happens?
    Michael

    Michael

    1 year ago
    It’s super confusing, it’s just behaving exactly as if
    checkpointing=False
    across the board. I can delete the cached files in GCS and try again to see if they are created (so far I was trying to see whether it used the cache that I had created by running locally). I will try now from scratch in the cloud with no saved results
    Kevin Kho

    Kevin Kho

    1 year ago
    Yeah that would help us see if there is an issue
    Michael

    Michael

    1 year ago
    It doesn’t touch GCS. It is just ignoring the results completely
    Kevin Kho

    Kevin Kho

    1 year ago
    Can I see your task decorator with the result and target?
    Michael

    Michael

    1 year ago
    I have two functions that help with generating these:
    def create_result_location(task_args: List[str]) -> Callable:
        def result_location(flow_name, task_full_name, **kwargs):
            task_arg_vals = []
            for task_arg_name in task_args:
                task_arg_val = kwargs[task_arg_name]
                if isinstance(task_arg_val, List):
                    task_arg_vals.append("|".join(task_arg_val))
                else:
                    task_arg_vals.append(str(task_arg_val))
            return f"{flow_name}/{task_full_name}/{cross_session_deterministic_hash('-'.join(task_arg_vals))}"
    
        return result_location
    
    
    def create_result_config(result_location: Callable) -> Result:
        return GCSResult(bucket=RESULT_BUCKET, location=result_location)
    and then I use these two functions with every task (I use subclasses not decorators) as follows:
    result_location = create_result_location(MyTask.DEPENDENCY_ARGS)
    my_task = MyTask(
        name="my_task",
        target=result_location,
        result=create_result_config(result_location),
        checkpoint=True,
    )
    my_task.set_upstream(dependency_arg_1, key="dependency_arg_1")
    ...
    MyTask.DEPENDENCY_ARGS
    is just a list of strings specifying the names of the parameters that should uniquely determine the caching path for that particular task (i.e. allowing me to take a subset of the full
    parameters
    dict, the question I was wondering about in a separate thread). EDIT: I just tested using a single function for all tasks (i.e. no function generating the callable that is passed to the Task constructor, but just one simple callable passed to each). This still doesn’t work when I try to run my Flow in the Cloud 🙃. Is it some issue with being on the free tier w Prefect Cloud??
    BTW, you said above “_I think the result will use the target already_”, I just wanted to confirm that tasks do need the target in order for output caching to work. You don’t need it to save a result, but it is needed to prevent task execution if a cache exists (although as you can see from my code example, it is a little redundant). It would be cool if there was an arg to the task constructor or something like that so I don’t have to pass the target/location path in two places
    Kevin Kho

    Kevin Kho

    1 year ago
    I think the syntax is to pass the callable directly. I think the
    result_location = create_result_location(MyTask.DEPENDENCY_ARGS)
    is creating the value during build time but you want it to be dynamic in runtime so you should pass the function instead.
    No this is not a free tier thing lol. This is in Prefect Core so it should be the same on Cloud and Server.
    You can do Result +
    cache_for
    and
    cache_validator
    or you can use
    target
    . From the docs
    So you don’t need to pass it in both places.
    When you go to the UI, can you see the Result associated with that task and see the location?
    How do you get the
    flow_name
    and
    task_full_name
    to pass in to that inner function?