<@U01QEJ9PP53> just curious, do you think there's ...
# ask-community
b
@Kevin Kho just curious, do you think there's a potential way to leverage the PandasSerializer from https://github.com/PrefectHQ/prefect/pull/3020 in conjunction with the question here? the
to_dict()
approach that you mentioned in this thread is similar to what we're doing (we return
list(df.itertuples())
instead) but it's not ideal for the same reasons that
PandasSerializer
is nice in general compared to using cloudpickle. I can't really think of anything off the top of my head though that wouldn't require messing with the .map internals (cc @Chris White @Zanie too in case y'all have any clever ideas). can also make a discussion on github if that's preferable
k
Hey @Brett Naul, I’ve been re-reading this and am a bit unclear. You use
list(df.itertuples())
, but then you want it to be serialized as a whole DataFrame later on? I presume each record is saved independently with PickleSerializer during your
map
operation?
b
yeah more specifically it would be cool to do something like
Copy code
@task(serializer=PandasSerializer(format="parquet"))
def load():
    return pd.DataFrame({"name": ["A", "B"], "x": [1, 2]})

@task
def apply(row):
    return 2 * row["x"]

with Flow("apply_map") as f:
    df = load()
    outputs = apply.map(df, axis=1)  # or whatever
right now you can't return the dataframe from
load()
, you have to return some kind of iterable instead (e.g.
list(df.itertuples())
but
<http://df.to|df.to>_dict(orient="records")
would work as well), which would get serialized via cloudpickle. it's not the end of the world but the reason PandasSerializer is nice in the first place is that the intermediate results are interoperable with other standard tools, whereas unpickling and converting back to a dataframe is a bit gross (and obviously very inefficient when the data gets large)
k
Ok so this can work. I think at this point, you need to subclass the
PandasSerializer
and make your own, but it shouldn’t be too much work. Maybe something like the code below. All it needs to do is basically use the
super
methods and just add one line to reconstruct the DataFrame from
list(df.itertuples())
before serializing and then deconstructing after deserialization with the
list(df.itertuplues())
. You can then use the serializer in the task.
Copy code
class MyPandasSerializer(PandasSerializer):

    def __init__(
        self,
        file_type: str,
        deserialize_kwargs: dict = None,
        serialize_kwargs: dict = None,
    ) -> None:
        super().__init__(file_type, deserialize_kwargs, serialize_kwargs)
        

    def serialize(self, value: List[Tuple]) -> bytes:  # noqa: F821
        """
        Serialize a Pandas DataFrame to bytes.
        Args:
            - value List[Tuple]: the result of list(df.itertuples())
        Returns:
            - bytes: the serialized value
        """
        # recreate df
        df = pd.DataFrame(value).drop("Index", axis = 1)
        return super().serialize(df)

    def deserialize(self, value: bytes) -> "pd.DataFrame":  # noqa: F821
        """
        Deserialize an object to a Pandas DataFrame
        Args:
            - value (bytes): the value to deserialize
        Returns:
            - List[Tuple]: the result of list(df.iterttuples())
        """
        df = super().deserialize(value)
        return list(df.itertuples())
You must be doing some funky things to need observability per row 😆. I don’t recommend the map over rows if the process is deterministic and highly unlikely to fail, but I’m sure you know what you are doing.
b
it doesn't seem all that funky to me 🙂 my actual dataframe looks like
Copy code
path start end
 gs://.../file0.parquet     0  10
 gs://.../file0.parquet    10  20
 gs://.../file1.parquet     0  10
 gs://.../file1.parquet    10  20
and the function being mapped looks like
Copy code
def process(chunk):
    path, start, end = chunk
    df = pd.read_parquet(path).iloc[start:end]
    do_stuff(df)
this is just one case but in general I'd like to use results that allow someone to easily open+inspect the contents for debugging; other people on my team understandably find
pd.read_csv(result.location)
to be more intuitive than
import gcsfs; pd.DataFrame(cloudpickle.loads(gcsfs.GCSFileSystem().open(result.location, "rb").read())
, plus you can just
head()
the file and so on. subclassing PandasSerializer seems like an interesting approach though, will play around with that some. thanks!
k
Just wondering, but if you are using Pandas DataFrames like this, could could use a Dask DataFrame and not have to go through Prefect right? Unless it’s very common for those individual parquets to fail.
b
yep failures are common, and we want the separated logs checkpointing etc
👍 1