Nishtha Varshney
07/12/2021, 11:58 AMKevin Kho
storage=Local(…, stored_as_script=True)
.Kevin Kho
Nishtha Varshney
07/12/2021, 4:04 PMNishtha Varshney
07/12/2021, 4:04 PMimport prefect
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers
from prefect import task,Flow,Parameter
import logging
import cloudpickle
from prefect.executors import LocalExecutor
from prefect.run_configs import LocalRun
MARKDOWN="""
Test_loss: {Test_loss}
Accuracy: {Accuracy}
"""
@task(nout=4)
def train():
# Model / data parameters
num_classes = 10
input_shape = (28, 28, 1)
# the data, split between train and test sets
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
# Scale images to the [0, 1] range
x_train = x_train.astype("float32") / 255
x_test = x_test.astype("float32") / 255
# Make sure images have shape (28, 28, 1)
x_train = np.expand_dims(x_train, -1)
x_test = np.expand_dims(x_test, -1)
# convert class vectors to binary class matrices
y_train = tf.keras.utils.to_categorical(y_train, num_classes)
y_test = tf.keras.utils.to_categorical(y_test, num_classes)
return x_test,x_train,y_test,y_train
@task
def train_model(x_train,y_train):
num_classes = 10
input_shape = (28, 28, 1)
model = tf.keras.Sequential(
[
tf.keras.Input(shape=input_shape),
layers.Conv2D(32, kernel_size=(3, 3), activation="relu"),
layers.MaxPooling2D(pool_size=(2, 2)),
layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
layers.MaxPooling2D(pool_size=(2, 2)),
layers.Flatten(),
layers.Dropout(0.5),
layers.Dense(num_classes, activation="softmax"),
]
)
#model.summary()
batch_size = 128
epochs = 2
model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, validation_split=0.1)
return model
@task
def test(model,x_test,y_test):
score = model.evaluate(x_test, y_test, verbose=0)
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(score[0])
<http://logger.info|logger.info>(score[1])
#print("Test loss:", score[0])
#print("Test accuracy:", score[1]*100)
prefect.artifacts.create_markdown(MARKDOWN.format(
Test_loss=score[0],
Accuracy=score[1]))
with Flow("ml-model",executor=LocalExecutor()) as flow:
x_test,x_train,y_test,y_train = train()
mod = train_model(x_train, y_train)
test(mod,x_test,y_test)
#flow.visualize()
flow.run_config = LocalRun(env={'PREFECT__LOGGING__LEVEL':'DEBUG'})
flow.register(project_name="data-science-flows")
Kevin Kho
Nishtha Varshney
07/12/2021, 5:52 PM@task(result=LocalResult(location="ml.prefect"))
def train_model(x_train,y_train):
num_classes = 10
input_shape = (28, 28, 1)
model = tf.keras.Sequential...
@Kevin Kho when i try to store the result of model locally it shows the same errorKevin Kho
class MySerializer(prefect.engine.serializers.Serializer):
def serialize(self, value: Any) -> bytes:
# transform a Python object into bytes
pass
def deserialize(self, value: bytes) -> Any:
# recover a Python object from bytes
pass
LocalResult(location=…, serializer=MySeriliazer())
. As seen hereNishtha Varshney
07/12/2021, 6:05 PM