Brett Naul
08/25/2021, 5:07 PMlots_of_dataframes_in_gcs = task(
process_data,
result=GCSResult(
PandasSerializer(file_type="parquet")
)
)
).map(input_files)
task(notify_data_is_ready).set_dependencies(upstream_tasks=[lots_of_dataframes])
I don't actually want the data passed to the downstream task, just to enforce the ordering. but based on my exploding daskworker memory it seems that the downstream task is actually collecting all of the outputs themselves(!), even though each transferred object shows up in the dask dashboard as a 48-byte Cached
state object (so the memory is 99.999% untracked by dask)
I can imagine a workaround that involves just skipping Results altogether and returning True or a file path explicitly, but it seems like this might be a bug and the state.result
should be getting purged before we ship it around? @Chris White @Zanie or anyone else have thoughts on whether this is expected?Chris White
Brett Naul
08/25/2021, 5:19 PMbecause it is attached to the upstream state objectby necessity or just bc it happens to still be there...? seems like the Result would be serving that purpose if one is set (if not then obviously it needs to hold on to to the data still).
Chris White
Brett Naul
08/25/2021, 6:15 PMset_dependencies
but that doesn't actually work
+class GCSReference(GCSResult):
+ """Prefect Result class that returns a path to the data instead of the data itself.
+
+ Useful for very large results that can't be loaded locally, e.g. mapped task outputs that will
+ be loaded into BigQuery without leaving GCS.
+ """
+
+ def read(self, location: str) -> Result:
+ new = self.copy()
+ new.location = location
+ new.value = location
+ return new
Chris White
GCSResult
class within the task and call its write method?Brett Naul
08/25/2021, 6:27 PMBrett Naul
08/25/2021, 6:31 PM.format(**prefect.context)
myself which was a bit of a painBrett Naul
08/25/2021, 6:41 PMChris White