Hello i am working with a simple ml model and tryi...
# ask-community
n
Hello i am working with a simple ml model and trying to use prefect ui to see the flow of it. I tried almost everything to debug it following the old threads related to pickling error, but nothing works. Can anyone please tell what can be done. Error - Unexpected error: TypeError("cannot pickle 'weakref' object") ``````
k
Hi @Nishtha Varshney, task outputs need to be serializeable by cloudpickle. You can find more information here . When you register, the default behavior is to serialize the Flow. I think this error might be related to the keras tensors. I think you should try saving these somewhere, and pass the location to the downstream task to load it. This might be able to work if you store the Flow as a script. See this for more information. For your use case, I think you want
storage=Local(…, stored_as_script=True)
.
Also, could you move the code to the thread for me to not crowd the main channel?
n
@Kevin Kho Thanks a lot I'll try
Copy code
import prefect
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers
from prefect import task,Flow,Parameter
import logging
import cloudpickle
from prefect.executors import LocalExecutor
from prefect.run_configs import LocalRun

MARKDOWN=""" 
Test_loss: {Test_loss}
Accuracy: {Accuracy}
"""

@task(nout=4)
def train():
# Model / data parameters
    num_classes = 10
    input_shape = (28, 28, 1)
    # the data, split between train and test sets
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    # Scale images to the [0, 1] range
    x_train = x_train.astype("float32") / 255
    x_test = x_test.astype("float32") / 255
    # Make sure images have shape (28, 28, 1)
    x_train = np.expand_dims(x_train, -1)
    x_test = np.expand_dims(x_test, -1)
    # convert class vectors to binary class matrices
    y_train = tf.keras.utils.to_categorical(y_train, num_classes)
    y_test = tf.keras.utils.to_categorical(y_test, num_classes)
    return x_test,x_train,y_test,y_train

@task
def train_model(x_train,y_train):
        num_classes = 10
        input_shape = (28, 28, 1)
        model = tf.keras.Sequential(
            [
                tf.keras.Input(shape=input_shape),
                layers.Conv2D(32, kernel_size=(3, 3), activation="relu"),
                layers.MaxPooling2D(pool_size=(2, 2)),
                layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
                layers.MaxPooling2D(pool_size=(2, 2)),
                layers.Flatten(),
                layers.Dropout(0.5),
                layers.Dense(num_classes, activation="softmax"),
            ]
        )
        #model.summary()
        batch_size = 128
        epochs = 2
        model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
        model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, validation_split=0.1)
        return model

@task
def test(model,x_test,y_test):
    score = model.evaluate(x_test, y_test, verbose=0)
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(score[0])
    <http://logger.info|logger.info>(score[1])
    #print("Test loss:", score[0])
    #print("Test accuracy:", score[1]*100)
    prefect.artifacts.create_markdown(MARKDOWN.format(
		Test_loss=score[0],
		Accuracy=score[1]))

with Flow("ml-model",executor=LocalExecutor()) as flow:  
    x_test,x_train,y_test,y_train = train()
    mod = train_model(x_train, y_train)
    test(mod,x_test,y_test)

#flow.visualize()
flow.run_config = LocalRun(env={'PREFECT__LOGGING__LEVEL':'DEBUG'})
flow.register(project_name="data-science-flows")
k
Thank you!
n
Copy code
@task(result=LocalResult(location="ml.prefect"))
def train_model(x_train,y_train):
        num_classes = 10
        input_shape = (28, 28, 1)
        model = tf.keras.Sequential...
@Kevin Kho when i try to store the result of model locally it shows the same error
k
The LocalResult there uses a PickleSerializer by default. I think in your case, you want to use a serializer that does nothing. Do this:
Copy code
class MySerializer(prefect.engine.serializers.Serializer):

    def serialize(self, value: Any) -> bytes:
        # transform a Python object into bytes
        pass

    def deserialize(self, value: bytes) -> Any:
        # recover a Python object from bytes
        pass
LocalResult(location=…, serializer=MySeriliazer())
. As seen here
n
Thanks I'll try 🙂