{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Tensorflow advanced techniques cheat sheet"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"import tensorflow as tf\n",
"import numpy as np\n",
"from tensorflow import keras"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Custom loss functions, layers and models"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In this section, I gathered cheat codes for customizing tensorflow/keras models, losses or layers"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Custom loss"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's try to implement the Huber loss:\n",
"$$\\displaystyle L_{\\delta }(y,f(x))={\\begin{cases}{\\frac {1}{2}}(y-f(x))^{2}&{\\textrm {for}}|y-f(x)|\\leq \\delta ,\\\\\\delta \\,(|y-f(x)|-{\\frac {1}{2}}\\delta ),&{\\textrm {otherwise.}}\\end{cases}}$$"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"
"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"def my_huber_loss(y_true, y_pred):\n",
" threshold = 1\n",
" error = y_true - y_pred\n",
" is_small_error = tf.abs(error) <= threshold\n",
" small_error_loss = tf.square(error) / 2\n",
" big_error_loss = threshold * (tf.abs(error) - (0.5 * threshold))\n",
" return tf.where(is_small_error, small_error_loss, big_error_loss)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's try it on the same vector"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [],
"source": [
"yt=np.array([-1.0, 0.0, 1.0, 2.0, 3.0, 4.0], dtype=float)\n",
"yp1=yt\n",
"yp2=yt*2+3"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
""
]
},
"execution_count": 19,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"my_huber_loss(yt,yp1)"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
""
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"my_huber_loss(yt,yp2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To use your custom loss in your model, just specify it as your loss argument in your_model.compile"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"model.compile(optimizer='sgd', loss=my_huber_loss) ##DO NOT RUN"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Custom loss with hyperparameters"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To customize a loss function with a specified hyperparameters, use a wrapper function that returns an instance of the loss function with the chosen value of the hyperparameter"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [],
"source": [
"def customize_huber_loss(threshold):\n",
" def my_huber_loss(y_true, y_pred):\n",
" error = y_true - y_pred\n",
" is_small_error = tf.abs(error) <= threshold\n",
" small_error_loss = tf.square(error) / 2\n",
" big_error_loss = threshold * (tf.abs(error) - (0.5 * threshold))\n",
" return tf.where(is_small_error, small_error_loss, big_error_loss)\n",
" \n",
" return my_huber_loss"
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"2.125"
]
},
"execution_count": 32,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"custom_loss=customize_huber_loss(0.5)\n",
"np.mean(custom_loss(yt,yp2))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To use it in your model:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"model.compile(optimizer='sgd', loss=customize_huber_loss(0.5)) ##DO NOT RUN"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Another possibility is to use a Loss Object instead of a function, by defining a class object representing your loss, as follows:"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [],
"source": [
"from tensorflow.keras.losses import Loss\n",
"class MyHuberLoss(Loss):\n",
" threshold=1 ##default value\n",
" def __init__(self,threshold):\n",
" super().__init__()\n",
" self.threshold=threshold\n",
" \n",
" def call(self,y_true,y_pred):\n",
" error = y_true - y_pred\n",
" is_small_error = tf.abs(error) <= self.threshold\n",
" small_error_loss = tf.square(error) / 2\n",
" big_error_loss = self.threshold * (tf.abs(error) - (0.5 * self.threshold))\n",
" return tf.where(is_small_error, small_error_loss, big_error_loss)"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
""
]
},
"execution_count": 30,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"custom_loss=MyHuberLoss(threshold=0.5)\n",
"custom_loss(yt,yp2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Use it in the model as follow:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"model.compile(optimizer='sgd', loss=MyHuberLoss(threshold=0.5)) ##DO NOT RUN"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Contrastive loss"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"It's called like this, because it contrasts two inputs and applies two different losses dependening on whether they are similar or not. \n",
"The goal is to learn an embedding such that similar inputs are close in the output space. \n",
"It was proposed by http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf \n",
"Here: \n",
"- $Y = 1$ if inputs are similar and 0 otherwise\n",
"- $\\hat{Y}$ : is the distance in embedding space"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"$$closs = y \\cdot \\hat{y}^2 + (1-y) \\cdot max(m-\\hat{y},0)^2$$"
]
},
{
"cell_type": "code",
"execution_count": 33,
"metadata": {},
"outputs": [],
"source": [
"def contrastive_loss_with_margin(margin):\n",
" def contrastive_loss(y_true, y_pred):\n",
" '''Contrastive loss from Hadsell-et-al.'06\n",
" http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf\n",
" '''\n",
" square_pred = K.square(y_pred)\n",
" margin_square = K.square(K.maximum(margin - y_pred, 0))\n",
" return K.mean(y_true * square_pred + (1 - y_true) * margin_square)\n",
" return contrastive_loss"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Lambda layers"
]
},
{
"cell_type": "code",
"execution_count": 38,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
""
]
},
"execution_count": 38,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from keras import backend as K\n",
"def my_relu(x):\n",
" return K.maximum(0,x)\n",
" \n",
"##define your lambda layer with the custom computation defined previously\n",
"tf.keras.layers.Lambda(my_relu)\n",
"tf.keras.layers.Lambda(lambda x: tf.abs(x))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Custom layers"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Common layers in tensorflow include:\n",
"
"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The structure of a layer is as follow: \n",
"
"
]
},
{
"cell_type": "code",
"execution_count": 42,
"metadata": {},
"outputs": [],
"source": [
"# inherit from this base class\n",
"from tensorflow.keras.layers import Layer\n",
"\n",
"class SimpleDense(Layer):\n",
"\n",
" def __init__(self, units=32, activation=None):\n",
" '''Initializes the instance attributes'''\n",
" super(SimpleDense, self).__init__()\n",
" self.units = units\n",
" self.activation=tf.keras.activations.get(activation)\n",
"\n",
" def build(self, input_shape):\n",
" '''Create the state of the layer (weights)'''\n",
" # initialize the weights\n",
" w_init = tf.random_uniform_initializer()\n",
" self.w = tf.Variable(name=\"kernel\",\n",
" initial_value=w_init(shape=(input_shape[-1], self.units),\n",
" dtype='float32'),\n",
" trainable=True)\n",
"\n",
" # initialize the biases\n",
" b_init = tf.zeros_initializer()\n",
" self.b = tf.Variable(name=\"bias\",\n",
" initial_value=b_init(shape=(self.units,), dtype='float32'),\n",
" trainable=True)\n",
" \n",
" super().build(input_shape)\n",
"\n",
" def call(self, inputs):\n",
" '''Defines the computation from inputs to outputs'''\n",
" return self.activation(tf.matmul(inputs, self.w) + self.b)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now, to customize the activations:"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Custom Models"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"you'll define all the layers in one function, `init`, and connect the layers together in another function, `call`."
]
},
{
"cell_type": "code",
"execution_count": 56,
"metadata": {},
"outputs": [],
"source": [
"from keras import Model\n",
"from keras.layers import Dense, Input,concatenate\n",
"\n",
"# inherit from the Model base class\n",
"class WideAndDeepModel(Model):\n",
" def __init__(self, units=30, activation='relu', **kwargs):\n",
" '''initializes the instance attributes'''\n",
" super().__init__(**kwargs)\n",
" self.hidden1 = Dense(units, activation=activation)\n",
" self.hidden2 = Dense(units, activation=activation)\n",
" self.main_output = Dense(1)\n",
" self.aux_output = Dense(1)\n",
"\n",
" def call(self, inputs):\n",
" '''defines the network architecture'''\n",
" input_A, input_B = inputs\n",
" hidden1 = self.hidden1(input_B)\n",
" hidden2 = self.hidden2(hidden1)\n",
" concat = concatenate([input_A, hidden2])\n",
" main_output = self.main_output(concat)\n",
" aux_output = self.aux_output(hidden2)\n",
" \n",
" return main_output, aux_output"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### ResNet model"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Here's a picture of the model we'd like to build:\n",
"
"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We notice that the following blocks are repeated, so we build a submodel with the corresponding layers:\n",
"
"
]
},
{
"cell_type": "code",
"execution_count": 68,
"metadata": {},
"outputs": [],
"source": [
"class IdentityBlock(tf.keras.Model):\n",
" def __init__(self, filters, kernel_size):\n",
" super(IdentityBlock, self).__init__(name='')\n",
"\n",
" self.conv1 = tf.keras.layers.Conv2D(filters, kernel_size, padding='same')\n",
" self.bn1 = tf.keras.layers.BatchNormalization()\n",
"\n",
" self.conv2 = tf.keras.layers.Conv2D(filters, kernel_size, padding='same')\n",
" self.bn2 = tf.keras.layers.BatchNormalization()\n",
"\n",
" self.act = tf.keras.layers.Activation('relu')\n",
" self.add = tf.keras.layers.Add()\n",
" \n",
" def call(self, input_tensor):\n",
" x = self.conv1(input_tensor)\n",
" x = self.bn1(x)\n",
" x = self.act(x)\n",
"\n",
" x = self.conv2(x)\n",
" x = self.bn2(x)\n",
"\n",
" x = self.add([x, input_tensor])\n",
" x = self.act(x)\n",
" return x"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Then, we define the architecture of the main model using the previous blocks and other layers:"
]
},
{
"cell_type": "code",
"execution_count": 69,
"metadata": {},
"outputs": [],
"source": [
"class ResNet(tf.keras.Model):\n",
" def __init__(self, num_classes):\n",
" super(ResNet, self).__init__()\n",
" self.conv = tf.keras.layers.Conv2D(64, 7, padding='same')\n",
" self.bn = tf.keras.layers.BatchNormalization()\n",
" self.act = tf.keras.layers.Activation('relu')\n",
" self.max_pool = tf.keras.layers.MaxPool2D((3, 3))\n",
"\n",
" # Use the Identity blocks that you just defined\n",
" self.id1a = IdentityBlock(64, 3)\n",
" self.id1b = IdentityBlock(64, 3)\n",
"\n",
" self.global_pool = tf.keras.layers.GlobalAveragePooling2D()\n",
" self.classifier = tf.keras.layers.Dense(num_classes, activation='softmax')\n",
"\n",
" def call(self, inputs):\n",
" x = self.conv(inputs)\n",
" x = self.bn(x)\n",
" x = self.act(x)\n",
" x = self.max_pool(x)\n",
"\n",
" # insert the identity blocks in the middle of the network\n",
" x = self.id1a(x)\n",
" x = self.id1b(x)\n",
"\n",
" x = self.global_pool(x)\n",
" return self.classifier(x)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now, we can instantiate our model according to the problem dimension (number of classes ), and train it on MNIST for instance. Upload the notebook to Colab to run the following."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import tensorflow_datasets as tfds\n",
"# utility function to normalize the images and return (image, label) pairs.\n",
"def preprocess(features):\n",
" return tf.cast(features['image'], tf.float32) / 255., features['label']\n",
"\n",
"# create a ResNet instance with 10 output units for MNIST\n",
"resnet = ResNet(10)\n",
"resnet.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])\n",
"\n",
"# load and preprocess the dataset\n",
"dataset = tfds.load('mnist', split=tfds.Split.TRAIN)\n",
"dataset = dataset.map(preprocess).batch(32)\n",
"\n",
"# train the model\n",
"resnet.fit(dataset, epochs=1)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### VGG"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Here's an illustration of the architecture we want to implement:\n",
"
"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We notice that there is a common structure made of a suite of a variable number of conv2D layers with variable filter sizes, followed by a maxpoll2d layer. So, we define it as a generic block. "
]
},
{
"cell_type": "code",
"execution_count": 81,
"metadata": {},
"outputs": [],
"source": [
"# Please uncomment all lines in this cell and replace those marked with `# YOUR CODE HERE`.\n",
"# You can select all lines in this code cell with Ctrl+A (Windows/Linux) or Cmd+A (Mac), then press Ctrl+/ (Windows/Linux) or Cmd+/ (Mac) to uncomment.\n",
"class Block(tf.keras.Model):\n",
" def __init__(self, filters, kernel_size, repetitions, pool_size=2, strides=2):\n",
" super(Block, self).__init__()\n",
" self.filters = filters\n",
" self.kernel_size = kernel_size\n",
" self.repetitions = repetitions\n",
" \n",
" # Define a conv2D_0, conv2D_1, etc based on the number of repetitions\n",
" for i in range(self.repetitions):\n",
" \n",
" # Define a Conv2D layer, specifying filters, kernel_size, activation and padding.\n",
" vars(self)[f'conv2D_{i}'] = tf.keras.layers.Conv2D(filters=self.filters,\n",
" kernel_size=self.kernel_size,\n",
" activation='relu',\n",
" padding='same'\n",
" )\n",
" \n",
" # Define the max pool layer that will be added after the Conv2D blocks\n",
" self.max_pool = tf.keras.layers.MaxPool2D(pool_size=(pool_size,pool_size),strides=(strides,strides))\n",
" \n",
" def call(self, inputs):\n",
" # access the class's conv2D_0 layer\n",
" conv2D_0 = vars(self)['conv2D_0']\n",
" \n",
" # Connect the conv2D_0 layer to inputs\n",
" x = conv2D_0(inputs)\n",
"\n",
" # for the remaining conv2D_i layers from 1 to `repetitions` they will be connected to the previous layer\n",
" for i in range(1,self.repetitions):\n",
" # access conv2D_i by formatting the integer `i`. (hint: check how these were saved using `vars()` earlier)\n",
" conv2D_i = vars(self)[f'conv2D_{i}']\n",
" \n",
" # Use the conv2D_i and connect it to the previous layer\n",
" x = conv2D_i(x)\n",
"\n",
" # Finally, add the max_pool layer\n",
" max_pool = self.max_pool(x)\n",
" \n",
" return max_pool"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next, we can define the full VGG model using the previous information."
]
},
{
"cell_type": "code",
"execution_count": 82,
"metadata": {},
"outputs": [],
"source": [
"# Please uncomment all lines in this cell and replace those marked with `# YOUR CODE HERE`.\n",
"# You can select all lines in this code cell with Ctrl+A (Windows/Linux) or Cmd+A (Mac), then press Ctrl+/ (Windows/Linux) or Cmd+/ (Mac) to uncomment.\n",
"class MyVGG(tf.keras.Model):\n",
"\n",
" def __init__(self, num_classes):\n",
" super(MyVGG, self).__init__()\n",
"\n",
" # Creating blocks of VGG with the following \n",
" # (filters, kernel_size, repetitions) configurations\n",
" self.block_a = Block(filters=64, kernel_size=3, repetitions=2, pool_size=2, strides=2)\n",
" self.block_b = Block(filters=128, kernel_size=3, repetitions=2, pool_size=2, strides=2)\n",
" self.block_c = Block(filters=256, kernel_size=3, repetitions=3, pool_size=2, strides=2)\n",
" self.block_d = Block(filters=512, kernel_size=3, repetitions=3, pool_size=2, strides=2)\n",
" self.block_e = Block(filters=512, kernel_size=3, repetitions=3, pool_size=2, strides=2)\n",
"\n",
" # Classification head\n",
" # Define a Flatten layer\n",
" self.flatten = tf.keras.layers.Flatten()\n",
" # Create a Dense layer with 256 units and ReLU as the activation function\n",
" self.fc = tf.keras.layers.Dense(units=256,activation='relu')\n",
" # Finally add the softmax classifier using a Dense layer\n",
" self.classifier = tf.keras.layers.Dense(units=num_classes,activation='softmax')\n",
"\n",
" def call(self, inputs):\n",
" # Chain all the layers one after the other\n",
" x = self.block_a(inputs)\n",
" x = self.block_b(x)\n",
" x = self.block_c(x)\n",
" x = self.block_d(x)\n",
" x = self.block_e(x)\n",
" x = self.flatten(x)\n",
" x = self.fc(x)\n",
" x = self.classifier(x)\n",
" return x"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Here, we can instantiate the VGG model with the desired number of classes"
]
},
{
"cell_type": "code",
"execution_count": 83,
"metadata": {},
"outputs": [],
"source": [
"vgg=MyVGG(num_classes=10)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Callbacks"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Callbacks are a useful piece of functionality in Tensorflow that lets you have control during the training process. Useful to visualize the internal state of the model as well as intermediary statistics about the loss and metrics. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Common built-in callbacks"
]
},
{
"cell_type": "code",
"execution_count": 94,
"metadata": {},
"outputs": [],
"source": [
"from keras.callbacks import TensorBoard, ModelCheckpoint, EarlyStopping, CSVLogger\n",
"tb = TensorBoard(log_dir='logdir')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In collab: \n",
"%load_ext tensorboard"
]
},
{
"cell_type": "code",
"execution_count": 93,
"metadata": {},
"outputs": [],
"source": [
"chkpt=ModelCheckpoint(filepath='weights.{epoch:02d}-{val_loss:.2f}.h5',\n",
" save_weights_only=True,verbose=1,monitor='val_loss',save_best_only=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"es=EarlyStopping(patience=3,monitor='val_loss',mode='min',verbose=1,baseline=0.8,min_delta=0.001)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"csvlog=CSVLogger('log_file.csv')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"##DO NOT RUN\n",
"model.fit(...,callbacks=[chkpt,tb,es,csvlog])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Custom callback"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"
"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "habmap",
"language": "python",
"name": "habmap"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.4"
}
},
"nbformat": 4,
"nbformat_minor": 4
}