How I can set a serializer to a task without setti...
# ask-community
d
How I can set a serializer to a task without setting up the result? We can have multiple storage for our flows which are specify at the flow level. We would like it task to serializer using pandas serializer however we can only do it using the result of the task? Any solution?
k
Hey @davzucky, you can use the Serializer directly by instantiating it and then using the
.serialize()
and
.deserialize()
methods manually.
Copy code
myserializer = PandasSerializer("csv", serialize_kwargs={"index": False})
test = pd.DataFrame({"a":[1,2,3], "b":[1,2,3]})
a = myserializer.serialize(test)
b = myserializer.deserialize(a)
print(b.head())
This
PandasSerializer
is just a bit coupled with having to specify a file type but this works. I think you do the serialization yourself, you have to just call it inside the task.
You can also override the result at the task level.
d
@Kevin Kho thank you for your reply. I think I want clear enough. I want to use the result that we setup in the flow to save the file at a specific location like we can override the target at the task level which will be used in the result. We would like to set a serializer the same way. Did it make sense? Or in missing something about how to use the serializer. Fur example I have a task that return a dictionary from a REST call. I want this task to be serialized Everytime as JSON but the location be set at the level of the flow
z
This interface is very likely to see a rehaul this year as well, but there's no easy way to do this right now.
d
@Zanie than you for your reply. Yes that was my feeling. Tried a few think without success. Yes after the change you did with the executor this is one area that will need love. Keep us updated and keep the good work. We are just using a task that manually export for us at the moment. Not the best design but it work.
z
My first thought for a workaround is: • Write a version of the task decorator that wraps the Prefect one and takes a serializer arg that it attaches as an attribute on the Task object • When you attach your result type to your flow, attach it to each task instead and set the serializer that you attached earlier
d
I like your idea. I was experimenting but couldn't finish to derive the task class to taskcustomserislizer which would replace in the init call the serializer of the result. However this need to work in local and in cloud. Will try your idea as well which is using the same idea
Thank you for your though
@Zanie I tried your idea. However I'm not sure how to do the second point. What I tried is in the init of my custom task if they are a result swap the serializer of the result. Butt those is not working because the result is only set of you pass a custom one.
Looked deeper into the code and thinking about • Adding a priority serializer to the best class • Update the task runner and if the task had a custom serializer replace the one of the result with that one. Thinking to do that as a PR to core. Do you think it will work? Would you accept this PR?
z
We're going to do some work on serializer/checkpointing/results in the next two quarters so we can't accept a PR on it right now
I think this should work for you
Copy code
from prefect import task, Flow
from prefect.engine.results import LocalResult
from prefect.engine.serializers import JSONSerializer


def my_task(fn=None, serializer=None, **kwargs):
    if fn is None:
        return lambda fn: my_task(
            fn=fn,
            serializer=serializer,
            **kwargs,
        )
    this_task = task(fn)
    this_task.serializer = serializer
    return this_task


@my_task(log_stdout=True)
def say_data(data):
    print(repr(data))


@my_task(serializer=JSONSerializer())
def make_data():
    return True


with Flow("mz-test") as flow:
    data = make_data()
    say_data(data)


flow.result = LocalResult()
for task in flow.tasks:
    serializer = getattr(task, "serializer", None)
    if serializer:
        task.result = LocalResult(serializer=serializer)
d
yes, this will work. Thank you for the sample. I will use that, as we have our own custom command launcher it will be simple to add that Thank you for your help. Looking forward to see the updated version