if I return a lot of data serialized with GCS resu...
# ask-community
b
if I return a lot of data serialized with GCS results, is it expected that the only thing dask will transfer is the result metadata as opposed to the data itself? say for example I do
Copy code
lots_of_dataframes_in_gcs = task(
    process_data,    
    result=GCSResult(
        PandasSerializer(file_type="parquet")
        )
    )
).map(input_files)
task(notify_data_is_ready).set_dependencies(upstream_tasks=[lots_of_dataframes])
I don't actually want the data passed to the downstream task, just to enforce the ordering. but based on my exploding daskworker memory it seems that the downstream task is actually collecting all of the outputs themselves(!), even though each transferred object shows up in the dask dashboard as a 48-byte
Cached
state object (so the memory is 99.999% untracked by dask) I can imagine a workaround that involves just skipping Results altogether and returning True or a file path explicitly, but it seems like this might be a bug and the
state.result
should be getting purged before we ship it around? @Chris White @Zanie or anyone else have thoughts on whether this is expected?
c
Interesting -- the data is transferred because it is attached to the upstream state object
b
because it is attached to the upstream state object
by necessity or just bc it happens to still be there...? seems like the Result would be serving that purpose if one is set (if not then obviously it needs to hold on to to the data still).
c
in memory movement of data without the back and forth trip is typically more of a feature than a nuisance so attaching the data directly to states is more necessity than not; do any downstream tasks from this one use the data directly?
b
no, it's quite impossible to load all of the data on one worker (many TB) so all the job is doing downstream is loading to BigQuery and notifying on success. I actually had started hacking together my own result for this use case but never quite got it working, had hoped to just stick with the built-in stuff and use
set_dependencies
but that doesn't actually work
Copy code
+class GCSReference(GCSResult):
+    """Prefect Result class that returns a path to the data instead of the data itself.
+
+    Useful for very large results that can't be loaded locally, e.g. mapped task outputs that will
+    be loaded into BigQuery without leaving GCS.
+    """
+
+    def read(self, location: str) -> Result:
+        new = self.copy()
+        new.location = location
+        new.value = location
+        return new
c
any reason you can't instantiate the
GCSResult
class within the task and call its write method?
b
I wouldn't get checkpointing though if I don't use a real Result object though, right?
but yeah I used to do something basically like that, and just return True inside the task which gets "checkpointed" to basically a sentinel file. the other complication I ran into is it didn't seem like I could easily access the result location from inside the task, so I had to abstract it out into a separate variable and do
.format(**prefect.context)
myself which was a bit of a pain
ahh ok I think I understand though, you're saying if I write to the result path manually and just return True, downstream tasks will receive True as the input but the data will be correct in GCS. that sounds like the best compromise
c
Yea exactly exactly