Michael
09/09/2021, 6:04 PM*prefect*.config.flows.checkpointing = True
somewhere in the code). But I can’t for the life of me get it to work when I use my Docker agent that’s connected to Prefect cloud. I set --env PREFECT__FLOWS__CHECKPOINTING=true
as an env variable when I run the agent, but it never uses any checkpointing logic. Any ideas here? I’m more than happy to provide more info as neededMichael
09/09/2021, 6:05 PM'Cached'
, whereas that doesn’t happen with the remoteKevin Kho
flows.checkpointing
is respected, but when running with a backend (Cloud or Server)l, Prefect automatically sets it as True for you so this should work.
Target is a form of caching so it makes sense they are cached on local. I am wondering if the target
exists on the Docker containerKevin Kho
Kevin Kho
Michael
09/09/2021, 6:08 PMMichael
09/09/2021, 6:09 PMKevin Kho
Michael
09/09/2021, 6:11 PMMichael
09/09/2021, 6:11 PMKevin Kho
Michael
09/09/2021, 6:13 PMdef default_result_location(flow_name, task_full_name, parameters, **kwargs):
all_params = [str(v) for _, v in parameters.items()]
return f"{flow_name}/{task_full_name}/{cross_session_deterministic_hash('-'.join(all_params))}"
I only saw this in the docstrings in the Result & Task codebase actually, not documented online (the fact that you can specify these paths as callables that receive the runtime context as kwargs)Kevin Kho
Kevin Kho
Michael
09/09/2021, 6:16 PMtarget
and the Result location
(& BTW this works amazingly when I run locally, I was super impressed. Sped up the caching process a lot compared to implementation inside the tasks themselves, so just have to get it sorted in the Cloud).Kevin Kho
Michael
09/09/2021, 6:20 PMcheckpointing=False
across the board. I can delete the cached files in GCS and try again to see if they are created (so far I was trying to see whether it used the cache that I had created by running locally). I will try now from scratch in the cloud with no saved resultsKevin Kho
Michael
09/09/2021, 6:21 PMKevin Kho
Michael
09/10/2021, 6:09 AMdef create_result_location(task_args: List[str]) -> Callable:
def result_location(flow_name, task_full_name, **kwargs):
task_arg_vals = []
for task_arg_name in task_args:
task_arg_val = kwargs[task_arg_name]
if isinstance(task_arg_val, List):
task_arg_vals.append("|".join(task_arg_val))
else:
task_arg_vals.append(str(task_arg_val))
return f"{flow_name}/{task_full_name}/{cross_session_deterministic_hash('-'.join(task_arg_vals))}"
return result_location
def create_result_config(result_location: Callable) -> Result:
return GCSResult(bucket=RESULT_BUCKET, location=result_location)
and then I use these two functions with every task (I use subclasses not decorators) as follows:
result_location = create_result_location(MyTask.DEPENDENCY_ARGS)
my_task = MyTask(
name="my_task",
target=result_location,
result=create_result_config(result_location),
checkpoint=True,
)
my_task.set_upstream(dependency_arg_1, key="dependency_arg_1")
...
MyTask.DEPENDENCY_ARGS
is just a list of strings specifying the names of the parameters that should uniquely determine the caching path for that particular task (i.e. allowing me to take a subset of the full parameters
dict, the question I was wondering about in a separate thread).
EDIT: I just tested using a single function for all tasks (i.e. no function generating the callable that is passed to the Task constructor, but just one simple callable passed to each). This still doesn’t work when I try to run my Flow in the Cloud 🙃. Is it some issue with being on the free tier w Prefect Cloud??Michael
09/10/2021, 6:19 AMKevin Kho
result_location = create_result_location(MyTask.DEPENDENCY_ARGS)
is creating the value during build time but you want it to be dynamic in runtime so you should pass the function instead.Kevin Kho
cache_for
and cache_validator
or you can use target
. From the docsKevin Kho
Kevin Kho
Kevin Kho
flow_name
and task_full_name
to pass in to that inner function?