Brett Naul
08/10/2021, 8:18 PMto_dict()
approach that you mentioned in this thread is similar to what we're doing (we return list(df.itertuples())
instead) but it's not ideal for the same reasons that PandasSerializer
is nice in general compared to using cloudpickle. I can't really think of anything off the top of my head though that wouldn't require messing with the .map internals (cc @Chris White @Zanie too in case y'all have any clever ideas). can also make a discussion on github if that's preferableKevin Kho
list(df.itertuples())
, but then you want it to be serialized as a whole DataFrame later on? I presume each record is saved independently with PickleSerializer during your map
operation?Brett Naul
08/10/2021, 11:30 PM@task(serializer=PandasSerializer(format="parquet"))
def load():
return pd.DataFrame({"name": ["A", "B"], "x": [1, 2]})
@task
def apply(row):
return 2 * row["x"]
with Flow("apply_map") as f:
df = load()
outputs = apply.map(df, axis=1) # or whatever
right now you can't return the dataframe from load()
, you have to return some kind of iterable instead (e.g. list(df.itertuples())
but <http://df.to|df.to>_dict(orient="records")
would work as well), which would get serialized via cloudpickle. it's not the end of the world but the reason PandasSerializer is nice in the first place is that the intermediate results are interoperable with other standard tools, whereas unpickling and converting back to a dataframe is a bit gross (and obviously very inefficient when the data gets large)Kevin Kho
PandasSerializer
and make your own, but it shouldn’t be too much work. Maybe something like the code below. All it needs to do is basically use the super
methods and just add one line to reconstruct the DataFrame from list(df.itertuples())
before serializing and then deconstructing after deserialization with the list(df.itertuplues())
. You can then use the serializer in the task.
class MyPandasSerializer(PandasSerializer):
def __init__(
self,
file_type: str,
deserialize_kwargs: dict = None,
serialize_kwargs: dict = None,
) -> None:
super().__init__(file_type, deserialize_kwargs, serialize_kwargs)
def serialize(self, value: List[Tuple]) -> bytes: # noqa: F821
"""
Serialize a Pandas DataFrame to bytes.
Args:
- value List[Tuple]: the result of list(df.itertuples())
Returns:
- bytes: the serialized value
"""
# recreate df
df = pd.DataFrame(value).drop("Index", axis = 1)
return super().serialize(df)
def deserialize(self, value: bytes) -> "pd.DataFrame": # noqa: F821
"""
Deserialize an object to a Pandas DataFrame
Args:
- value (bytes): the value to deserialize
Returns:
- List[Tuple]: the result of list(df.iterttuples())
"""
df = super().deserialize(value)
return list(df.itertuples())
You must be doing some funky things to need observability per row 😆. I don’t recommend the map over rows if the process is deterministic and highly unlikely to fail, but I’m sure you know what you are doing.Brett Naul
08/11/2021, 11:45 AMpath start end
gs://.../file0.parquet 0 10
gs://.../file0.parquet 10 20
gs://.../file1.parquet 0 10
gs://.../file1.parquet 10 20
and the function being mapped looks like
def process(chunk):
path, start, end = chunk
df = pd.read_parquet(path).iloc[start:end]
do_stuff(df)
this is just one case but in general I'd like to use results that allow someone to easily open+inspect the contents for debugging; other people on my team understandably find pd.read_csv(result.location)
to be more intuitive than import gcsfs; pd.DataFrame(cloudpickle.loads(gcsfs.GCSFileSystem().open(result.location, "rb").read())
, plus you can just head()
the file and so on.
subclassing PandasSerializer seems like an interesting approach though, will play around with that some. thanks!Kevin Kho
Brett Naul
08/11/2021, 4:56 PM