davzucky
07/02/2021, 7:18 AMKevin Kho
.serialize()
and .deserialize()
methods manually.
myserializer = PandasSerializer("csv", serialize_kwargs={"index": False})
test = pd.DataFrame({"a":[1,2,3], "b":[1,2,3]})
a = myserializer.serialize(test)
b = myserializer.deserialize(a)
print(b.head())
Kevin Kho
PandasSerializer
is just a bit coupled with having to specify a file type but this works. I think you do the serialization yourself, you have to just call it inside the task.Kevin Kho
davzucky
07/02/2021, 2:29 PMZanie
davzucky
07/02/2021, 2:35 PMZanie
davzucky
07/02/2021, 2:49 PMdavzucky
07/02/2021, 2:50 PMdavzucky
07/07/2021, 12:57 AMdavzucky
07/07/2021, 12:59 AMZanie
Zanie
Zanie
from prefect import task, Flow
from prefect.engine.results import LocalResult
from prefect.engine.serializers import JSONSerializer
def my_task(fn=None, serializer=None, **kwargs):
if fn is None:
return lambda fn: my_task(
fn=fn,
serializer=serializer,
**kwargs,
)
this_task = task(fn)
this_task.serializer = serializer
return this_task
@my_task(log_stdout=True)
def say_data(data):
print(repr(data))
@my_task(serializer=JSONSerializer())
def make_data():
return True
with Flow("mz-test") as flow:
data = make_data()
say_data(data)
flow.result = LocalResult()
for task in flow.tasks:
serializer = getattr(task, "serializer", None)
if serializer:
task.result = LocalResult(serializer=serializer)
davzucky
07/07/2021, 11:29 PM