Nishtha Varshney

    Nishtha Varshney

    1 year ago
    Hello i am working with a simple ml model and trying to use prefect ui to see the flow of it. I tried almost everything to debug it following the old threads related to pickling error, but nothing works. Can anyone please tell what can be done. Error - Unexpected error: TypeError("cannot pickle 'weakref' object") ``````
    Kevin Kho

    Kevin Kho

    1 year ago
    Hi @Nishtha Varshney, task outputs need to be serializeable by cloudpickle. You can find more information here . When you register, the default behavior is to serialize the Flow. I think this error might be related to the keras tensors. I think you should try saving these somewhere, and pass the location to the downstream task to load it. This might be able to work if you store the Flow as a script. See this for more information. For your use case, I think you want
    storage=Local(…, stored_as_script=True)
    .
    Also, could you move the code to the thread for me to not crowd the main channel?
    Nishtha Varshney

    Nishtha Varshney

    1 year ago
    @Kevin Kho Thanks a lot I'll try
    import prefect
    import tensorflow as tf
    import numpy as np
    from tensorflow import keras
    from tensorflow.keras import layers
    from prefect import task,Flow,Parameter
    import logging
    import cloudpickle
    from prefect.executors import LocalExecutor
    from prefect.run_configs import LocalRun
    
    MARKDOWN=""" 
    Test_loss: {Test_loss}
    Accuracy: {Accuracy}
    """
    
    @task(nout=4)
    def train():
    # Model / data parameters
        num_classes = 10
        input_shape = (28, 28, 1)
        # the data, split between train and test sets
        (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
        # Scale images to the [0, 1] range
        x_train = x_train.astype("float32") / 255
        x_test = x_test.astype("float32") / 255
        # Make sure images have shape (28, 28, 1)
        x_train = np.expand_dims(x_train, -1)
        x_test = np.expand_dims(x_test, -1)
        # convert class vectors to binary class matrices
        y_train = tf.keras.utils.to_categorical(y_train, num_classes)
        y_test = tf.keras.utils.to_categorical(y_test, num_classes)
        return x_test,x_train,y_test,y_train
    
    @task
    def train_model(x_train,y_train):
            num_classes = 10
            input_shape = (28, 28, 1)
            model = tf.keras.Sequential(
                [
                    tf.keras.Input(shape=input_shape),
                    layers.Conv2D(32, kernel_size=(3, 3), activation="relu"),
                    layers.MaxPooling2D(pool_size=(2, 2)),
                    layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
                    layers.MaxPooling2D(pool_size=(2, 2)),
                    layers.Flatten(),
                    layers.Dropout(0.5),
                    layers.Dense(num_classes, activation="softmax"),
                ]
            )
            #model.summary()
            batch_size = 128
            epochs = 2
            model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
            model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, validation_split=0.1)
            return model
    
    @task
    def test(model,x_test,y_test):
        score = model.evaluate(x_test, y_test, verbose=0)
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(score[0])
        <http://logger.info|logger.info>(score[1])
        #print("Test loss:", score[0])
        #print("Test accuracy:", score[1]*100)
        prefect.artifacts.create_markdown(MARKDOWN.format(
    		Test_loss=score[0],
    		Accuracy=score[1]))
    
    with Flow("ml-model",executor=LocalExecutor()) as flow:  
        x_test,x_train,y_test,y_train = train()
        mod = train_model(x_train, y_train)
        test(mod,x_test,y_test)
    
    #flow.visualize()
    flow.run_config = LocalRun(env={'PREFECT__LOGGING__LEVEL':'DEBUG'})
    flow.register(project_name="data-science-flows")
    Kevin Kho

    Kevin Kho

    1 year ago
    Thank you!
    Nishtha Varshney

    Nishtha Varshney

    1 year ago
    @task(result=LocalResult(location="ml.prefect"))
    def train_model(x_train,y_train):
            num_classes = 10
            input_shape = (28281)
            model = tf.keras.Sequential...
    @Kevin Kho when i try to store the result of model locally it shows the same error
    Kevin Kho

    Kevin Kho

    1 year ago
    The LocalResult there uses a PickleSerializer by default. I think in your case, you want to use a serializer that does nothing. Do this:
    class MySerializer(prefect.engine.serializers.Serializer):
    
        def serialize(self, value: Any) -> bytes:
            # transform a Python object into bytes
            pass
    
        def deserialize(self, value: bytes) -> Any:
            # recover a Python object from bytes
            pass
    LocalResult(location=…, serializer=MySeriliazer())
    . As seen here
    Nishtha Varshney

    Nishtha Varshney

    1 year ago
    Thanks I'll try 🙂