Hey all! I am looking for a way to use file target...
# ask-community
m
Hey all! I am looking for a way to use file targets for output caching, but parameterised by the upstream dependencies to a specific task. In other words, I don’t care exactly what the GCSResult file path looks like, but I need that 1) the filepath corresponds to that task’s upstream dependencies (such that one task could have many possible output results), and 2) all these different results could be cached at any one time. Has anyone done something like this? NB: what would be amazing is something like an
{upstream}
context option for a given task that somehow serializes the upstream dependencies into the result / target file name at run time. I know there is a
{parameters}
context value (so I could use this in worst case), but not every task in a particular flow depends on all of that flow’s parameters, so this would result in unecessary computation. Right now I implement something like this inside the task’s run logic itself, but I imagine it’s a helluva lot slower than letting the orchestrator deal with skipping tasks rather than entering every task to perform a file existence check before exiting (and also a lot less clean). Cheers for any help!
Managed to figure out a solution here (I think). I’ll happily explain if anyone else is stuck 🙂
j
Would love to hear your solution. 😃😃
b
+1 interested in this space
I'm doing something similar - all of the inputs (and the flow parameters) are available when formatting the target filename, so you could pass a callable as your target formatter and compute some "input hash" for the filename.
👍 1
Actually this would be easier if you used a custom Result class, something like ..
Copy code
class TokenizedResult(Result):

    def __init__(self, root: str, protocol: str = "file", **kwargs):
        self.protocol = protocol
        self.root = root
        self.fs = fsspec.filesystem(protocol)
        super().__init__(**kwargs)

    @staticmethod
    def _inputs_to_kwargs(inputs):
        return {k: _extract_value(v) for k, v in inputs.items()}

    @staticmethod
    def _context_to_func(**context):
        return _find_task_func(flow_name=context["flow_name"], task_name=context["task_name"])

    def _path(self, inputs, **kwargs):
        """ Determine a tokenized path for this task and inputs """
        task_func = self._context_to_func(**kwargs)
        module = task_func.__module__
        func_kwargs = self._inputs_to_kwargs(inputs=inputs)
        key = tokenize_func(func=task_func)(**func_kwargs)
        task_name = kwargs["task_name"]
        return f"{self.root}/{module}.{task_name}/{key}"

    def format(self, **kwargs: Any) -> "Result":
        new = self.copy()
        new.location = new.location.replace("{filename}", "")
        new.location = new.location.format(**kwargs)
        return new

    def exists(self, location: str, inputs: Dict, **kwargs: Any) -> bool:
        path = self._path(inputs=inputs, **kwargs.copy())
        return self.fs.exists(path=path)

    def read(self, location: str, inputs: Dict, **kwargs) -> "Result":
        path = self._path(inputs=inputs, **kwargs)
        with self.fs.open(path, 'rb') as f:
            return self.serializer.deserialize(f.read())

    def write(self, value: Any, inputs: Dict, **kwargs: Any) -> "Result":
        path = self._path(inputs=inputs, **kwargs)
        with self.fs.open(path, 'wb') as f:
            f.write(self.serializer.serialize(value))
        return value
Theres a couple of missing implementations here but hopefully it gives you an idea of how it can be done
m
Your first comment is exactly what I did Brad, and it solved the problem for me (especially since I’m using the GCSResult and so have no need to override its lower-level functions exists/read/write etc.) I did something like the following
Copy code
def create_result_location(task_args: List[str]) -> Callable:
    def result_location(flow_name, task_full_name, **kwargs):
        task_arg_vals = []
        for task_arg_name in task_args:
            task_arg_val = kwargs[task_arg_name]
            if isinstance(task_arg_val, List):
                task_arg_vals.append("|".join(task_arg_val))
            else:
                task_arg_vals.append(str(task_arg_val))
        return f"{flow_name}/{task_full_name}/{cross_session_deterministic_hash('-'.join(task_arg_vals))}"

    return result_location
so this function takes in the string names of the dependencies of a particular task, and returns a callable that creates a hash using the values of those parameters. The way this is then used is as follows
Copy code
result_location = create_result_location(MyTask.DEPENDENCY_ARGS)
my_task = MyTask(
    name="my_task",
    target=result_location,
    result=GCSResult(bucket=GCS_BUCKET, location=result_location),
    checkpoint=True,
)
my_task.set_upstream(dependency_arg_1, key="dependency_arg_1")
...
where
DEPENDENCY_ARGS
is just a list of string names of parameters that should uniquely determine a particular task’s output cache location
b
yep nice one!