Michael
09/09/2021, 2:45 PM{upstream}
context option for a given task that somehow serializes the upstream dependencies into the result / target file name at run time. I know there is a {parameters}
context value (so I could use this in worst case), but not every task in a particular flow depends on all of that flow’s parameters, so this would result in unecessary computation.
Right now I implement something like this inside the task’s run logic itself, but I imagine it’s a helluva lot slower than letting the orchestrator deal with skipping tasks rather than entering every task to perform a file existence check before exiting (and also a lot less clean).
Cheers for any help!Michael
09/09/2021, 6:12 PMJohn Shearer
09/09/2021, 8:40 PMBrad
09/09/2021, 9:42 PMBrad
09/09/2021, 9:47 PMBrad
09/09/2021, 9:57 PMclass TokenizedResult(Result):
def __init__(self, root: str, protocol: str = "file", **kwargs):
self.protocol = protocol
self.root = root
self.fs = fsspec.filesystem(protocol)
super().__init__(**kwargs)
@staticmethod
def _inputs_to_kwargs(inputs):
return {k: _extract_value(v) for k, v in inputs.items()}
@staticmethod
def _context_to_func(**context):
return _find_task_func(flow_name=context["flow_name"], task_name=context["task_name"])
def _path(self, inputs, **kwargs):
""" Determine a tokenized path for this task and inputs """
task_func = self._context_to_func(**kwargs)
module = task_func.__module__
func_kwargs = self._inputs_to_kwargs(inputs=inputs)
key = tokenize_func(func=task_func)(**func_kwargs)
task_name = kwargs["task_name"]
return f"{self.root}/{module}.{task_name}/{key}"
def format(self, **kwargs: Any) -> "Result":
new = self.copy()
new.location = new.location.replace("{filename}", "")
new.location = new.location.format(**kwargs)
return new
def exists(self, location: str, inputs: Dict, **kwargs: Any) -> bool:
path = self._path(inputs=inputs, **kwargs.copy())
return self.fs.exists(path=path)
def read(self, location: str, inputs: Dict, **kwargs) -> "Result":
path = self._path(inputs=inputs, **kwargs)
with self.fs.open(path, 'rb') as f:
return self.serializer.deserialize(f.read())
def write(self, value: Any, inputs: Dict, **kwargs: Any) -> "Result":
path = self._path(inputs=inputs, **kwargs)
with self.fs.open(path, 'wb') as f:
f.write(self.serializer.serialize(value))
return value
Theres a couple of missing implementations here but hopefully it gives you an idea of how it can be doneMichael
09/10/2021, 7:08 AMdef create_result_location(task_args: List[str]) -> Callable:
def result_location(flow_name, task_full_name, **kwargs):
task_arg_vals = []
for task_arg_name in task_args:
task_arg_val = kwargs[task_arg_name]
if isinstance(task_arg_val, List):
task_arg_vals.append("|".join(task_arg_val))
else:
task_arg_vals.append(str(task_arg_val))
return f"{flow_name}/{task_full_name}/{cross_session_deterministic_hash('-'.join(task_arg_vals))}"
return result_location
so this function takes in the string names of the dependencies of a particular task, and returns a callable that creates a hash using the values of those parameters. The way this is then used is as follows
result_location = create_result_location(MyTask.DEPENDENCY_ARGS)
my_task = MyTask(
name="my_task",
target=result_location,
result=GCSResult(bucket=GCS_BUCKET, location=result_location),
checkpoint=True,
)
my_task.set_upstream(dependency_arg_1, key="dependency_arg_1")
...
where DEPENDENCY_ARGS
is just a list of string names of parameters that should uniquely determine a particular task’s output cache locationBrad
09/10/2021, 9:33 PM