tensorflow 2.0 deep learning (Part 3 convolution neural network part 2)

Various classical convolution networks

LeNet-5

import tensorflow as tf
help(tf.losses.categorical_crossentropy) 
	//View the list of default parameters and the introduction to the usage of the category? Crossentropy function
 	//Where the parameter is from Logits = false by default, and the network prediction value y ﹤ PRED indicates that it must be the output value passing through the Softmax function.
	//When from Logits is set to True, the network prediction value y ﹤ PRED indicates that it must be the variable z that has not passed the Softmax function.
	from_logits=True Flag location will be softmax The activation function is implemented in the loss function, so there is no need to add it manually softmax Loss function improves the stability of numerical calculation.

Help on function categorical_crossentropy in module tensorflow.python.keras.losses:
categorical_crossentropy(y_true, y_pred, from_logits=False, label_smoothing=0) 
    Computes the categorical crossentropy loss.
    Args:
      y_true: tensor of true targets.
      y_pred: tensor of predicted targets.
      from_logits: Whether `y_pred` is expected to be a logits tensor. By default,
        we assume that `y_pred` encodes a probability distribution.

AlexNet

VGG

GoogLeNet

CIFAR10 vs. VGG13

def preprocess(x, y):
    # Standardization: mapping data between - 1 and 1
    x = 2*tf.cast(x, dtype=tf.float32) / 255.-1
    y = tf.cast(y, dtype=tf.int32)
    return x,y
    
# Download online, load CIFAR10 data set
(x,y), (x_test, y_test) = datasets.cifar100.load_data()
# Delete a dimension of y, [b, 1] = > [b]
y = tf.squeeze(y, axis=1)
y_test = tf.squeeze(y_test, axis=1)
# Print training and test set shapes
print(x.shape, y.shape, x_test.shape, y_test.shape)
# Building training set objects
train_db = tf.data.Dataset.from_tensor_slices((x,y))
train_db = train_db.shuffle(1000).map(preprocess).batch(128)
# Building test set objects
test_db = tf.data.Dataset.from_tensor_slices((x_test,y_test))
test_db = test_db.map(preprocess).batch(128)
# Sample a Batch from the training set and observe
sample = next(iter(train_db))
print('sample:', sample[0].shape, sample[1].shape, tf.reduce_min(sample[0]), tf.reduce_max(sample[0]))

# Create a list with multiple layers first
conv_layers = [ 
    # Conv conv pooling unit 1
    # 64 3 x 3 convolution cores of the same size of input and output
    layers.Conv2D(64, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.Conv2D(64, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    # Half height and half width
    layers.MaxPool2D(pool_size=[2, 2], strides=2, padding='same'),

    # Conv conv pooling unit 2, the output channel is increased to 128, and the height and width are halved
    layers.Conv2D(128, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.Conv2D(128, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    # Half height and half width
    layers.MaxPool2D(pool_size=[2, 2], strides=2, padding='same'),

    # Conv conv pooling unit 3, the output channel is increased to 256, and the height and width are halved
    layers.Conv2D(256, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.Conv2D(256, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    # Half height and half width
    layers.MaxPool2D(pool_size=[2, 2], strides=2, padding='same'),

    # Conv conv pooling unit 4, the output channel is increased to 512, and the height and width are halved
    layers.Conv2D(512, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.Conv2D(512, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    # Half height and half width
    layers.MaxPool2D(pool_size=[2, 2], strides=2, padding='same'),

    # Conv conv pooling unit 5, the output channel is increased to 512, and the height width is halved
    layers.Conv2D(512, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.Conv2D(512, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    # Half height and half width
    layers.MaxPool2D(pool_size=[2, 2], strides=2, padding='same')
]

# Using the layer list created earlier to build a network container
conv_net = Sequential(conv_layers)
 
# Create 3-layer full connection layer subnet
fc_net = Sequential([
    layers.Dense(256, activation=tf.nn.relu),
    layers.Dense(128, activation=tf.nn.relu),
    layers.Dense(100, activation=None),
])
 
# Build 2 subnetworks and print network parameter information
conv_net.build(input_shape=[4, 32, 32, 3])
fc_net.build(input_shape=[4, 512])
conv_net.summary()
fc_net.summary()
 
# List merging, merging parameters of 2 subnetworks
variables = conv_net.trainable_variables + fc_net.trainable_variables
# Gradient all parameters
grads = tape.gradient(loss, variables)
# Auto update
optimizer.apply_gradients(zip(grads, variables))
import  tensorflow as tf
from    tensorflow.keras import layers, optimizers, datasets, Sequential
import  os
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'
tf.random.set_seed(2345)

# 5 units of conv + max pooling
conv_layers = [ 
    # unit 1
    layers.Conv2D(64, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.Conv2D(64, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.MaxPool2D(pool_size=[2, 2], strides=2, padding='same'),
    # unit 2
    layers.Conv2D(128, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.Conv2D(128, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.MaxPool2D(pool_size=[2, 2], strides=2, padding='same'),
    # unit 3
    layers.Conv2D(256, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.Conv2D(256, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.MaxPool2D(pool_size=[2, 2], strides=2, padding='same'),
    # unit 4
    layers.Conv2D(512, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.Conv2D(512, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.MaxPool2D(pool_size=[2, 2], strides=2, padding='same'),
    # unit 5
    layers.Conv2D(512, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.Conv2D(512, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.MaxPool2D(pool_size=[2, 2], strides=2, padding='same')
]

def preprocess(x, y):
    # Standardization: data mapping between [- 1,1]
    x = 2*tf.cast(x, dtype=tf.float32) / 255.-1
    y = tf.cast(y, dtype=tf.int32)
    return x,y

(x,y), (x_test, y_test) = datasets.cifar10.load_data()
y = tf.squeeze(y, axis=1)
y_test = tf.squeeze(y_test, axis=1)
print(x.shape, y.shape, x_test.shape, y_test.shape)
train_db = tf.data.Dataset.from_tensor_slices((x,y))
train_db = train_db.shuffle(1000).map(preprocess).batch(128)
test_db = tf.data.Dataset.from_tensor_slices((x_test,y_test))
test_db = test_db.map(preprocess).batch(64)
sample = next(iter(train_db))
print('sample:', sample[0].shape, sample[1].shape,
      tf.reduce_min(sample[0]), tf.reduce_max(sample[0]))

def main():
    # [b, 32, 32, 3] => [b, 1, 1, 512]
    conv_net = Sequential(conv_layers)
    fc_net = Sequential([
        layers.Dense(256, activation=tf.nn.relu),
        layers.Dense(128, activation=tf.nn.relu),
        layers.Dense(10, activation=None),
    ])
    conv_net.build(input_shape=[None, 32, 32, 3])
    fc_net.build(input_shape=[None, 512])
    conv_net.summary()
    fc_net.summary()

    optimizer = optimizers.Adam(lr=1e-4)
    # [1, 2] + [3, 4] => [1, 2, 3, 4]
    variables = conv_net.trainable_variables + fc_net.trainable_variables

    for epoch in range(50):
        for step, (x,y) in enumerate(train_db):
            with tf.GradientTape() as tape:
                # [b, 32, 32, 3] => [b, 1, 1, 512]
                out = conv_net(x)
                # flatten, => [b, 512]
                out = tf.reshape(out, [-1, 512])
                # [b, 512] => [b, 10]
                logits = fc_net(out)
                # [b] => [b, 10]
                y_onehot = tf.one_hot(y, depth=10)
                # compute loss
                loss = tf.losses.categorical_crossentropy(y_onehot, logits, from_logits=True)
                loss = tf.reduce_mean(loss)
            grads = tape.gradient(loss, variables)
            optimizer.apply_gradients(zip(grads, variables))
            if step %100 == 0:
                print(epoch, step, 'loss:', float(loss))

        total_num = 0
        total_correct = 0
        for x,y in test_db:
            out = conv_net(x)
            out = tf.reshape(out, [-1, 512])
            logits = fc_net(out)
            prob = tf.nn.softmax(logits, axis=1) #Convert output to probability
            pred = tf.argmax(prob, axis=1) #Obtain the index value corresponding to the maximum probability value as the category number
            pred = tf.cast(pred, dtype=tf.int32)
            correct = tf.cast(tf.equal(pred, y), dtype=tf.int32) #Check whether the index value (category number) corresponding to the maximum probability value is consistent with the real label category number
            correct = tf.reduce_sum(correct)
            total_num += x.shape[0]
            total_correct += int(correct)
        acc = total_correct / total_num
        print(epoch, 'acc:', acc)

if __name__ == '__main__':
    main()

 

Deep residual network ResNet, DenseNet

import  os
import  tensorflow as tf
import  numpy as np
from    tensorflow import keras
 
tf.random.set_seed(22)
np.random.seed(22)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
 
(x_train, y_train), (x_test, y_test) = keras.datasets.fashion_mnist.load_data()
x_train, x_test = x_train.astype(np.float32)/255., x_test.astype(np.float32)/255.
# [b, 28, 28] => [b, 28, 28, 1]
x_train, x_test = np.expand_dims(x_train, axis=3), np.expand_dims(x_test, axis=3)
# one hot encode the labels. convert back to numpy as we cannot use a combination of numpy
# and tensors as input to keras
y_train_ohe = tf.one_hot(y_train, depth=10).numpy()
y_test_ohe = tf.one_hot(y_test, depth=10).numpy()
print(x_train.shape, y_train.shape) #(60000, 28, 28, 1) (60000,)
print(x_test.shape, y_test.shape) #(10000, 28, 28, 1) (10000,)

# 3x3 convolution
def conv3x3(channels, stride=1, kernel=(3, 3)):
    return keras.layers.Conv2D(channels, kernel, strides=stride, padding='same', use_bias=False, kernel_initializer=tf.random_normal_initializer())

class ResnetBlock(keras.Model):
    def __init__(self, channels, strides=1, residual_path=False):
        super(ResnetBlock, self).__init__()
        self.channels = channels
        self.strides = strides
        self.residual_path = residual_path
        self.conv1 = conv3x3(channels, strides)
        self.bn1 = keras.layers.BatchNormalization()
        self.conv2 = conv3x3(channels)
        self.bn2 = keras.layers.BatchNormalization()
        if residual_path:
            #In fact, the output of the last layer in the previous block is connected as a residual,	
	   #Add the two elements to the output of the current layer in the current block
            self.down_conv = conv3x3(channels, strides, kernel=(1, 1))
            self.down_bn = tf.keras.layers.BatchNormalization()

    def call(self, inputs, training=None):
        residual = inputs
        x = self.bn1(inputs, training=training)
        x = tf.nn.relu(x)
        x = self.conv1(x)
        x = self.bn2(x, training=training)
        x = tf.nn.relu(x)
        x = self.conv2(x)
        # this module can be added into self.
        # however, module in for can not be added.
        if self.residual_path:
            residual = self.down_bn(inputs, training=training)
            residual = tf.nn.relu(residual)
            residual = self.down_conv(residual)
        x = x + residual
        return x

class ResNet(keras.Model):
    def __init__(self, block_list, num_classes, initial_filters=16, **kwargs):
        super(ResNet, self).__init__(**kwargs)
        self.num_blocks = len(block_list)
        self.block_list = block_list
        self.in_channels = initial_filters
        self.out_channels = initial_filters
        self.conv_initial = conv3x3(self.out_channels)
        self.blocks = keras.models.Sequential(name='dynamic-blocks')

        # build all the blocks
        #Traverse each block
        for block_id in range(len(block_list)):
            #Traverse the layer layer in each block
            for layer_id in range(block_list[block_id]):
                #Only when it is not the first block and the first layer of a block, that is, when the first layer of a new block is added, the following judgment will be performed
 	       #The current program only performs the following judgment when the block ID is 1 and the layer ID is 0, or the block ID is 2 and the layer ID is 0, that is, as long as the first layer of a new block is added
                if block_id != 0 and layer_id == 0:
                    #Because the number of out ﹣ channels * = 2 output channels is doubled at this time, set strings = 2 to halve the height and width of the output. At the same time, set reset ﹣ path = true,
		  #In fact, the output of the last layer in the previous block is connected as a residual,
		  #Add the two elements to the output of the current layer in the current block
                    block = ResnetBlock(self.out_channels, strides=2, residual_path=True)
                #As long as it is any layer layer in the first block, or it is not the first layer in the first block, the following judgment will be performed
                #The current program only performs the following judgment when the block ID is 0 and the layer ID is 0 or 1, or the block ID is 1 or 2 and the layer ID is 1
                else:
		  #The current program will not execute the following judgment code
                    if self.in_channels != self.out_channels:
                        residual_path = True
		  #The current program only performs the following judgment when the block ID is 0 and the layer ID is 0 or 1, or the block ID is 1 or 2 and the layer ID is 1
                    else:
                        residual_path = False
	 	  #The parameter residual path of this code of the current program will only be False
                    block = ResnetBlock(self.out_channels, residual_path=residual_path)

                self.in_channels = self.out_channels
                self.blocks.add(block)
            self.out_channels *= 2 #As long as you add a new block, the number of output channels for the new block will double

        self.final_bn = keras.layers.BatchNormalization()
        self.avg_pool = keras.layers.GlobalAveragePooling2D()
        self.fc = keras.layers.Dense(num_classes)

    def call(self, inputs, training=None):
        out = self.conv_initial(inputs)
        out = self.blocks(out, training=training)
        out = self.final_bn(out, training=training)
        out = tf.nn.relu(out)
        out = self.avg_pool(out)
        out = self.fc(out)
        return out

def main():
    num_classes = 10 #Class number
    batch_size = 32
    epochs = 1
    # build model and optimizer
    #Build 6 block blocks, and each block block has 2 layer layers
    model = ResNet([2, 2, 2], num_classes)
    model.compile(optimizer=keras.optimizers.Adam(0.001),
                  loss=keras.losses.CategoricalCrossentropy(from_logits=True),
                  metrics=['accuracy'])
    model.build(input_shape=(None, 28, 28, 1))
    print("Number of variables in the model :", len(model.variables))
    model.summary()
    # train
    model.fit(x_train, y_train_ohe, batch_size=batch_size, epochs=epochs, validation_data=(x_test, y_test_ohe), verbose=1)
    # evaluate on test set
    #evaluate output loss and accuracy when evaluating and verifying
    scores = model.evaluate(x_test, y_test_ohe, batch_size, verbose=1)
    print("Final test loss and accuracy :", scores)

if __name__ == '__main__':
    main()

import  tensorflow as tf
from    tensorflow import keras
from    tensorflow.keras import layers, Sequential
 
class BasicBlock(layers.Layer):
    def __init__(self, filter_num, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = layers.Conv2D(filter_num, (3, 3), strides=stride, padding='same')
        self.bn1 = layers.BatchNormalization()
        self.relu = layers.Activation('relu')
        self.conv2 = layers.Conv2D(filter_num, (3, 3), strides=1, padding='same')
        self.bn2 = layers.BatchNormalization()

        if stride != 1:
            self.downsample = Sequential()
            self.downsample.add(layers.Conv2D(filter_num, (1, 1), strides=stride))
        else:
            self.downsample = lambda x:x

    def call(self, inputs, training=None):
        # [b, h, w, c]
        out = self.conv1(inputs)
        out = self.bn1(out,training=training)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out,training=training)
        identity = self.downsample(inputs)
        output = layers.add([out, identity])
        output = tf.nn.relu(output)
        return output
 
class ResNet(keras.Model):
    def __init__(self, layer_dims, num_classes=100): # [2, 2, 2, 2]
        super(ResNet, self).__init__()
        self.stem = Sequential([layers.Conv2D(64, (3, 3), strides=(1, 1)),
                                layers.BatchNormalization(),
                                layers.Activation('relu'),
                                layers.MaxPool2D(pool_size=(2, 2), strides=(1, 1), padding='same')
                                ])
        self.layer1 = self.build_resblock(64,  layer_dims[0])
        self.layer2 = self.build_resblock(128, layer_dims[1], stride=2)
        self.layer3 = self.build_resblock(256, layer_dims[2], stride=2)
        self.layer4 = self.build_resblock(512, layer_dims[3], stride=2)
        # output: [b, 512, h, w],
        self.avgpool = layers.GlobalAveragePooling2D()
        self.fc = layers.Dense(num_classes)

    def call(self, inputs, training=None):
        x = self.stem(inputs,training=training)
        x = self.layer1(x,training=training)
        x = self.layer2(x,training=training)
        x = self.layer3(x,training=training)
        x = self.layer4(x,training=training)
        # [b, c]
        x = self.avgpool(x)
        # [b, 100]
        x = self.fc(x)
        return x

    def build_resblock(self, filter_num, blocks, stride=1):
        res_blocks = Sequential()
        # may down sample
        res_blocks.add(BasicBlock(filter_num, stride))
        for _ in range(1, blocks):
            res_blocks.add(BasicBlock(filter_num, stride=1))
        return res_blocks

def resnet18():
    return ResNet([2, 2, 2, 2])

def resnet34():
    return ResNet([3, 4, 6, 3])
import  os
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'
import  tensorflow as tf
from    tensorflow.keras import layers, optimizers, datasets, Sequential 
from    resnet import resnet18 
tf.random.set_seed(2345)

def preprocess(x, y):
    # [-1~1]
    x = tf.cast(x, dtype=tf.float32) / 255. - 0.5
    y = tf.cast(y, dtype=tf.int32)
    return x,y

(x,y), (x_test, y_test) = datasets.cifar100.load_data()
y = tf.squeeze(y, axis=1)
y_test = tf.squeeze(y_test, axis=1)
print(x.shape, y.shape, x_test.shape, y_test.shape)
train_db = tf.data.Dataset.from_tensor_slices((x,y))
train_db = train_db.shuffle(1000).map(preprocess).batch(512)
test_db = tf.data.Dataset.from_tensor_slices((x_test,y_test))
test_db = test_db.map(preprocess).batch(512)
sample = next(iter(train_db))
print('sample:', sample[0].shape, sample[1].shape, tf.reduce_min(sample[0]), tf.reduce_max(sample[0]))

def main():
    # [b, 32, 32, 3] => [b, 1, 1, 512]
    model = resnet18()
    model.build(input_shape=(None, 32, 32, 3))
    model.summary()
    optimizer = optimizers.Adam(lr=1e-3)

    for epoch in range(500):
        for step, (x,y) in enumerate(train_db):
            with tf.GradientTape() as tape:
                # [b, 32, 32, 3] => [b, 100]
                logits = model(x,training=True)
                # [b] => [b, 100]
                y_onehot = tf.one_hot(y, depth=100)
                # compute loss
                loss = tf.losses.categorical_crossentropy(y_onehot, logits, from_logits=True)
                loss = tf.reduce_mean(loss)

            grads = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(grads, model.trainable_variables))
            if step %50 == 0:
                print(epoch, step, 'loss:', float(loss))

        total_num = 0
        total_correct = 0
        for x,y in test_db:
            logits = model(x,training=False)
            prob = tf.nn.softmax(logits, axis=1) #Convert output to probability
            pred = tf.argmax(prob, axis=1)
            pred = tf.cast(pred, dtype=tf.int32)
            correct = tf.cast(tf.equal(pred, y), dtype=tf.int32)
            correct = tf.reduce_sum(correct)
            total_num += x.shape[0]
            total_correct += int(correct)
        acc = total_correct / total_num
        print(epoch, 'acc:', acc)

if __name__ == '__main__':
    main()

CIFAR10 and ResNet18

class BasicBlock(layers.Layer):
    # Residual module class
    def __init__(self, filter_num, stride=1):
        super(BasicBlock, self).__init__()
        # f(x) contains two ordinary convolution layers, creating convolution layer 1
        self.conv1 = layers.Conv2D(filter_num, (3, 3), strides=stride, padding='same')
        self.bn1 = layers.BatchNormalization()
        self.relu = layers.Activation('relu')
        # Create convolution layer 2
        self.conv2 = layers.Conv2D(filter_num, (3, 3), strides=1, padding='same')
        self.bn2 = layers.BatchNormalization()
        # Insert identity layer
        if stride != 1: 
            self.downsample = Sequential()
            self.downsample.add(layers.Conv2D(filter_num, (1, 1), strides=stride))
        else: 
            # Otherwise, connect directly
            self.downsample = lambda x:x
 
    def call(self, inputs, training=None):
        # Forward propagation function
        out = self.conv1(inputs) # Through the first convolution layer
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out) # Through the second convolution layer
        out = self.bn2(out)
        # Input is converted by identity()
        identity = self.downsample(inputs)
        # f(x)+x operation
        output = layers.add([out, identity])
        # By activating the function and returning
        output = tf.nn.relu(output)
        return output
import  tensorflow as tf
from    tensorflow import keras
from    tensorflow.keras import layers, Sequential
 
class BasicBlock(layers.Layer):
    # Residual module
    def __init__(self, filter_num, stride=1):
        super(BasicBlock, self).__init__()
        # First convolution unit
        self.conv1 = layers.Conv2D(filter_num, (3, 3), strides=stride, padding='same')
        self.bn1 = layers.BatchNormalization()
        self.relu = layers.Activation('relu')
        # Second convolution unit
        self.conv2 = layers.Conv2D(filter_num, (3, 3), strides=1, padding='same')
        self.bn2 = layers.BatchNormalization()
        # shape matching by 1x1 convolution
        if stride != 1:
            self.downsample = Sequential()
            self.downsample.add(layers.Conv2D(filter_num, (1, 1), strides=stride))
        else:
            # shape matching, direct shorting
            self.downsample = lambda x:x

    def call(self, inputs, training=None):
        # [b, h, w, c], through the first convolution unit
        out = self.conv1(inputs)
        out = self.bn1(out)
        out = self.relu(out)
        # Through the second convolution unit
        out = self.conv2(out)
        out = self.bn2(out)
        # Through identity module
        identity = self.downsample(inputs)
        # 2 paths output add directly
        output = layers.add([out, identity])
        output = tf.nn.relu(output) # Activation function
        return output

class ResNet(keras.Model):
    # Generic ResNet implementation class
    def __init__(self, layer_dims, num_classes=10): # [2, 2, 2, 2]
        super(ResNet, self).__init__()
        # Root network, preprocessing
        self.stem = Sequential([layers.Conv2D(64, (3, 3), strides=(1, 1)),
                                layers.BatchNormalization(),
                                layers.Activation('relu'),
                                layers.MaxPool2D(pool_size=(2, 2), strides=(1, 1), padding='same')
                                ])
        # Stack 4 blocks, each block contains multiple basicblocks, with different setting steps
        self.layer1 = self.build_resblock(64,  layer_dims[0])
        self.layer2 = self.build_resblock(128, layer_dims[1], stride=2)
        self.layer3 = self.build_resblock(256, layer_dims[2], stride=2)
        self.layer4 = self.build_resblock(512, layer_dims[3], stride=2)
        # Reduce the height and width to 1x1 through the Pooling layer
        self.avgpool = layers.GlobalAveragePooling2D()
        # Finally, connect a full connection layer classification
        self.fc = layers.Dense(num_classes)

    def call(self, inputs, training=None):
        # Through the root network
        x = self.stem(inputs)
        # 4 modules at a time
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        # Through the pool layer
        x = self.avgpool(x)
        # Through full connection layer
        x = self.fc(x)
        return x

    def build_resblock(self, filter_num, blocks, stride=1):
        # Auxiliary functions, stacking filter ﹣ num basicblocks
        res_blocks = Sequential()
        # Only the step size of the first BasicBlock may not be 1, so lower sampling is implemented
        res_blocks.add(BasicBlock(filter_num, stride))
        for _ in range(1, blocks):
            #The other BasicBlock steps are all 1
            res_blocks.add(BasicBlock(filter_num, stride=1))
        return res_blocks
 
def resnet18():
    # Realize different ResNet by adjusting the number and configuration of basicblocks in the module
    return ResNet([2, 2, 2, 2])
 
def resnet34():
     # Realize different ResNet by adjusting the number and configuration of basicblocks in the module
    return ResNet([3, 4, 6, 3])
import  tensorflow as tf
from    tensorflow.keras import layers, optimizers, datasets, Sequential
import  os
from    resnet import resnet18
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'
tf.random.set_seed(2345)
 
def preprocess(x, y):
    # Map data to - 1 ~ 1
    x = 2*tf.cast(x, dtype=tf.float32) / 255. - 1
    y = tf.cast(y, dtype=tf.int32) # Type conversion
    return x,y

(x,y), (x_test, y_test) = datasets.cifar10.load_data() # Load dataset
y = tf.squeeze(y, axis=1) # Delete unnecessary dimensions
y_test = tf.squeeze(y_test, axis=1) # Delete unnecessary dimensions
print(x.shape, y.shape, x_test.shape, y_test.shape)
train_db = tf.data.Dataset.from_tensor_slices((x,y)) # Build a training set
# Random break-up, pretreatment, batch
train_db = train_db.shuffle(1000).map(preprocess).batch(512)
test_db = tf.data.Dataset.from_tensor_slices((x_test,y_test)) #Build test set
# Random break-up, pretreatment, batch
test_db = test_db.map(preprocess).batch(512)
# Take a sample
sample = next(iter(train_db))
print('sample:', sample[0].shape, sample[1].shape, tf.reduce_min(sample[0]), tf.reduce_max(sample[0]))

def main():
    # [b, 32, 32, 3] => [b, 1, 1, 512]
    model = resnet18() # ResNet18 network
    model.build(input_shape=(None, 32, 32, 3))
    model.summary() # Statistical network parameters
    optimizer = optimizers.Adam(lr=1e-4) # Build optimizer

    for epoch in range(100): # Training epoch
        for step, (x,y) in enumerate(train_db):
            with tf.GradientTape() as tape:
                # [b, 32, 32, 3] = > [b, 10], forward propagation
                logits = model(x)
                # [b] = > [b, 10], one hot code
                y_onehot = tf.one_hot(y, depth=10)
                # Calculate cross entropy
                loss = tf.losses.categorical_crossentropy(y_onehot, logits, from_logits=True)
                loss = tf.reduce_mean(loss)
            # Calculate gradient information
            grads = tape.gradient(loss, model.trainable_variables)
            # Update network parameters
            optimizer.apply_gradients(zip(grads, model.trainable_variables))
            if step %50 == 0:
                print(epoch, step, 'loss:', float(loss))

        total_num = 0
        total_correct = 0
        for x,y in test_db:
            logits = model(x)
            prob = tf.nn.softmax(logits, axis=1)  #Convert output to probability
            pred = tf.argmax(prob, axis=1)  #Obtain the index value corresponding to the maximum probability value as the category number
            pred = tf.cast(pred, dtype=tf.int32)
            #Check whether the index value (category number) corresponding to the maximum probability value is consistent with the real label category number
            correct = tf.cast(tf.equal(pred, y), dtype=tf.int32)
            correct = tf.reduce_sum(correct)
            total_num += x.shape[0]
            total_correct += int(correct)
        acc = total_correct / total_num
        print(epoch, 'acc:', acc)

if __name__ == '__main__':
    main()

import  os
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'
import  tensorflow as tf
from    tensorflow.keras import layers, optimizers, datasets, Sequential
tf.random.set_seed(2345)

# 5 units of conv + max pooling
conv_layers = [ 
    # unit 1
    layers.Conv2D(64, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.Conv2D(64, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.MaxPool2D(pool_size=[2, 2], strides=2, padding='same'),
    # unit 2
    layers.Conv2D(128, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.Conv2D(128, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.MaxPool2D(pool_size=[2, 2], strides=2, padding='same'),
    # unit 3
    layers.Conv2D(256, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.Conv2D(256, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.MaxPool2D(pool_size=[2, 2], strides=2, padding='same'),
    # unit 4
    layers.Conv2D(512, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.Conv2D(512, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.MaxPool2D(pool_size=[2, 2], strides=2, padding='same'),
    # unit 5
    layers.Conv2D(512, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.Conv2D(512, kernel_size=[3, 3], padding="same", activation=tf.nn.relu),
    layers.MaxPool2D(pool_size=[2, 2], strides=2, padding='same')
]

def preprocess(x, y):
    # Standardize mapping data to [0 ~ 1]
    x = tf.cast(x, dtype=tf.float32) / 255.
    y = tf.cast(y, dtype=tf.int32)
    return x,y
 
(x,y), (x_test, y_test) = datasets.cifar100.load_data()
y = tf.squeeze(y, axis=1)
y_test = tf.squeeze(y_test, axis=1)
print(x.shape, y.shape, x_test.shape, y_test.shape)
train_db = tf.data.Dataset.from_tensor_slices((x,y))
train_db = train_db.shuffle(1000).map(preprocess).batch(128)
test_db = tf.data.Dataset.from_tensor_slices((x_test,y_test))
test_db = test_db.map(preprocess).batch(64)
sample = next(iter(train_db))
print('sample:', sample[0].shape, sample[1].shape, tf.reduce_min(sample[0]), tf.reduce_max(sample[0]))

def main():
    # [b, 32, 32, 3] => [b, 1, 1, 512]
    conv_net = Sequential(conv_layers)
    fc_net = Sequential([
        layers.Dense(256, activation=tf.nn.relu),
        layers.Dense(128, activation=tf.nn.relu),
        layers.Dense(100, activation=None),
    ])
    conv_net.build(input_shape=[None, 32, 32, 3])
    fc_net.build(input_shape=[None, 512])
    optimizer = optimizers.Adam(lr=1e-4)
    # [1, 2] + [3, 4] => [1, 2, 3, 4]
    variables = conv_net.trainable_variables + fc_net.trainable_variables

    for epoch in range(50):
        for step, (x,y) in enumerate(train_db):
            with tf.GradientTape() as tape:
                # [b, 32, 32, 3] => [b, 1, 1, 512]
                out = conv_net(x)
                # flatten, => [b, 512]
                out = tf.reshape(out, [-1, 512])
                # [b, 512] => [b, 100]
                logits = fc_net(out)
                # [b] => [b, 100]
                y_onehot = tf.one_hot(y, depth=100)
                # compute loss
                loss = tf.losses.categorical_crossentropy(y_onehot, logits, from_logits=True)
                loss = tf.reduce_mean(loss)

            grads = tape.gradient(loss, variables)
            optimizer.apply_gradients(zip(grads, variables))
            if step %100 == 0:
                print(epoch, step, 'loss:', float(loss))

        total_num = 0
        total_correct = 0
        for x,y in test_db:
            out = conv_net(x)
            out = tf.reshape(out, [-1, 512])
            logits = fc_net(out)
            prob = tf.nn.softmax(logits, axis=1) #softmax converts the output value to a probability value
            pred = tf.argmax(prob, axis=1)
            pred = tf.cast(pred, dtype=tf.int32)
            correct = tf.cast(tf.equal(pred, y), dtype=tf.int32)
            correct = tf.reduce_sum(correct)
            total_num += x.shape[0]
            total_correct += int(correct)
        acc = total_correct / total_num
        print(epoch, 'acc:', acc)

if __name__ == '__main__':
    main()

Convolution layers for different purposes

Cavity convolution

# Analog input
x = tf.random.normal([1,7,7,1]) 
# Set the division rate parameter and a 3 x 3 convolution kernel.
layer = layers.Conv2D(1,kernel_size=3,strides=1,dilation_rate=2)
out = layer(x) # Forward computation
out.shape

Transpose convolution conv2d transpose

Principle of transposition convolution operation 1: o+2p-k is s multiple
	1.o+2p-k is s multiple: o refers to the width / height of the output of the transposed convolution layer, P refers to the filling of the transposed convolution core, K refers to the width w / height h of the transposed convolution core, and s refers to the step length s of the transposed convolution core.
	2. For example, for a single channel characteristic graph (i.e. transposed convolution input) with width w / height h of 2x2, the transposed convolution kernel has width w / height h of 3x3 (i.e. k), step s of 2, and fill p of 0. Then the transposed convolution calculation flow is as follows:
		1. Step 1: evenly insert s-1 blank data points of transposed convolution kernel step size between data points in the input single channel characteristic graph, then change from 2x2 to 3x3 matrix.
		2. Step 2: the k-p-1 corresponding to the transposition convolution kernel needs to be filled around the 3 x 3 matrix. It is concluded that the 3-0-1 = 2 rows and 2 columns should be filled around the 3 x 3 matrix, so the matrix will change from 3 x 3 to 7 x 7.
		3. The third step is to perform the general convolution operation on the 7x7 matrix. The width w / height h of the convolution kernel is 3 x 3 (that is, k, which is the same as the width w / height h of the transposed convolution kernel),
			 In this stage, the step s of the convolution kernel is always 1 (different from the step s of the transposition convolution), and the filling p is 0 (the same as the filling p of the transposition convolution kernel),
			 According to the general convolution formula o = ⌊ (i+2*p-k)/s ⌋ + 1 (round up ⌈: the smallest integer larger than yourself, round down ⌊: the largest integer smaller than yourself),
			 It is concluded that o = ⌊ (7 + 2 * 0-3) / 1 ⌋ + 1 = 5.
	3. Conclusion: in the third step, the output 5x5 of the general convolution operation is the output of this transposition convolution operation, then the relationship between the input (2x2) and the output (5x5) of the transposition convolution operation is
	        When o+2p-k i s s multiple, o=(i-1)*s+k-2*p, O refers to the width / height of the output of the transposed convolution layer, P refers to the filling of the transposed convolution core, K refers to the width / height h of the transposed convolution core, and S refers to the step length s of the transposed convolution core,
	        i refers to the input of transposition convolution operation, then according to the above formula, when 5 + 2 * 0-3 = 2 is a multiple of 2, there is (2-1) × 2 + 3-2 * 0 = 5.

#Based on TensorFlow, the above example's transposition convolution operation is realized as follows:
import tensorflow as tf
# Create an X matrix with a height and width of 5x5
x = tf.range(25)+1 
#<tf.Tensor: id=5, shape=(25,), dtype=int32, numpy=
#array([ 1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16, 17,
#       18, 19, 20, 21, 22, 23, 24, 25])>
# Reshape is the tensor of legal dimension
x = tf.reshape(x,[1,5,5,1])
x.shape #TensorShape([1, 5, 5, 1])
x = tf.cast(x, tf.float32)
# Create a convolution kernel matrix with fixed content. The shape is [3, 3, 1, 1], i.e. [k,k, number of input channels, number of output channels]
w = tf.constant([[-1,2,-3.],[4,-5,6],[-7,8,-9]])
w.shape #TensorShape([3, 3])
# Tensor adjusted to legal dimension
w = tf.expand_dims(w,axis=2) 
w.shape #TensorShape([3, 3, 1])	
w = tf.expand_dims(w,axis=3) 
w.shape #TensorShape([3, 3, 1, 1]), i.e. [k,k, number of input channels, number of output channels]
#The padding parameter is used to set the padding. There are two values for this parameter: "valid" means no padding is used (only valid window position is used). The default value of padding parameter is "valid";
#"Same" means "the width and height of the output after filling is the same as the input".
# Carry out general convolution operation
out = tf.nn.conv2d(x,w,strides=2,padding='VALID')
# Output height and width are 2x2
out 
#<tf.Tensor: id=14, shape=(1, 2, 2, 1), dtype=float32, numpy=
#array([[[[ -67.],
#         [ -77.]],
#        [[-117.],
#         [-127.]]]], dtype=float32)>


# Taking the output 2x2 ([1, 2, 2, 1]) of ordinary convolution as the input of transposition convolution, verify whether the output of transposition convolution is 5x5 ([1, 5, 5, 1])
# The output of general convolution is used as the input of transposition convolution to perform transposition convolution operation
xx = tf.nn.conv2d_transpose(out, w, strides=2, padding='VALID', output_shape=[1,5,5,1])
# The output is 5x5 ([1,5,5,1]). Transposition convolution can restore the output of ordinary convolution to the input of ordinary convolution, but the output of transposition convolution is not equal to the input of ordinary convolution
xx.shape #TensorShape([1, 5, 5, 1])

#When the input width of the general convolution operation is 5x5 or 6x6, the output is the same 2x2
import tensorflow as tf
# Create a convolution kernel matrix with fixed content. The shape is [3, 3, 1, 1], i.e. [k,k, number of input channels, number of output channels]
w = tf.constant([[-1,2,-3.],[4,-5,6],[-7,8,-9]])
# Tensor adjusted to legal dimension
w = tf.expand_dims(w,axis=2) 
w.shape #TensorShape([3, 3, 1])	
w = tf.expand_dims(w,axis=3) 
w.shape #TensorShape([3, 3, 1, 1]), i.e. [k,k, number of input channels, number of output channels]
x = tf.random.normal([1,6,6,1])
#The padding parameter is used to set the padding. There are two values for this parameter: "valid" means no padding is used (only valid window position is used). The default value of padding parameter is "valid";
#"Same" means "the width and height of the output after filling is the same as the input". padding='SAME 'only when strings = 1 is the same size of output and input.
# 6x6 input is convoluted
out = tf.nn.conv2d(x,w, strides=2, padding='VALID')
out.shape #TensorShape([1, 2, 2, 1]) #When the height and width of the input are 5x5 or 6x6, the output is the same 2x2
x = tf.random.normal([1,5,5,1])
# 6x6 input is convoluted
out = tf.nn.conv2d(x,w, strides=2, padding='VALID')
out.shape #TensorShape([1, 2, 2, 1]) #When the height and width of the input are 5x5 or 6x6, the output is the same 2x2
1. Ordinary convolution formula o = ⌊ (i+2*p-k)/s ⌋ + 1 (round up ⌈: the smallest integer larger than yourself, round down ⌊: the largest integer smaller than yourself)
  For example, a single channel characteristic graph (i.e. input I of ordinary convolution operation) with width w / height h of 5x5 or 6x6, convolution kernel with width w / height h of 3x3 (i.e. k), step s of 2 (step must be greater than 1), filling p of 0, output o of ordinary convolution operation are 2x2,
  When the step size s is greater than 1, the final rounding down will cause a variety of different input size i to correspond to the same convolution output size o.
  For example, if the input i of general convolution is 5, then⌊ (5 + 2 * 0-3) / 2 ⌋ + 1 = 2; if the input i of general convolution is 6, then⌊ (6 + 2 * 0-3) / 2 ⌋ + 1 = 2.

2. Principle of transposition convolution 2: o+2p-k is not s multiple
	1.o+2p-k is not a multiple of S: o refers to the width / height of the output of the transposed convolution layer, P refers to the filling of the transposed convolution core, K refers to the width w / height h of the transposed convolution core, and s refers to the step length s of the transposed convolution core.
	2. In the ordinary convolution operation, as long as the step size s is greater than 1, a variety of different input size i will correspond to the same convolution output size o after rounding down,
	  Therefore, in the transposition convolution operation, we naturally hope that the same input size i can be used to get different output size o after the transposition convolution operation,
	  Therefore, it is necessary to fill row a and column a to realize the output o of different size of transposition convolution, so as to recover the output o of the same size corresponding to the input i of different size in the ordinary convolution.
	3. For example, the width w / height h of the current ordinary convolution operation i s 5x5 or 6x6 for the single channel characteristic graph (i.e. the input I of the ordinary convolution operation), the width w / height h of the convolution core is 3x3 (i.e., k), the step S is 2 (the step must be greater than 1), and the filling p is 0,
 	  The output o of ordinary convolution operation is 2x2. Now, we want to realize the customized output of the convolution by 2x2 transposition input, which is 5x5 or 6x6, the filling p of the transposition core is 0, and the width w / height h of the transposition core is 3x3,
	  The step size s of the transposed convolution kernel is 2. The following two steps are needed to achieve the above purpose:
		1. Step 1: a=(o+2p-k)%s, a row and a column need to be filled to realize the output o of transposition convolution of different sizes, so as to recover the output o of the same size corresponding to the input i of different sizes in the ordinary convolution.
		 	 For example, when the output of the transposition convolution is 5x5(o is 5), (5 + 2 * 0-3)% 2 = 0, i.e. fill 0 rows and 0 columns to achieve the transposition convolution output size o is 5x5.
			 For example, when the output of transposition convolution is 6x6(o is 6), the output size o of transposition convolution is 6x6 (6 + 2 * 0-3)% 2 = 1.
		2. Step 2: o=(i-1)*s+k-2*p+a, according to the calculated a row and a column to be filled in step 1, transpose the desired size o of convolution output.
			 For example, if you want to output 5x5(o = 5) by transposing convolution, the calculated 0 rows and 0 columns to be filled: (2-1) * 2 + 3-2 * 0 + 0 = 5.
			 For example, if you want to convert the convolution output to 6x6(o = 6), you need to fill in one row and one column: (2-1) * 2 + 3-2 * 0 + 1 = 6.

 

import tensorflow as tf

# Create a convolution kernel matrix with fixed content. The shape is [3, 3, 1, 1], i.e. [k,k, number of input channels, number of output channels]
w = tf.constant([[-1,2,-3.],[4,-5,6],[-7,8,-9]])
# Tensor adjusted to legal dimension
w = tf.expand_dims(w,axis=2) 
w.shape #TensorShape([3, 3, 1])	
w = tf.expand_dims(w,axis=3) 
w.shape #TensorShape([3, 3, 1, 1]), i.e. [k,k, number of input channels, number of output channels]
x = tf.random.normal([1,6,6,1])
#The padding parameter is used to set the padding. There are two values for this parameter: "valid" means no padding is used (only valid window position is used). The default value of padding parameter is "valid";
#"Same" means "the width and height of the output after filling is the same as the input". padding='SAME 'only when strings = 1 is the same size of output and input.
# 6x6 input is convoluted
out = tf.nn.conv2d(x,w, strides=2, padding='VALID')
out.shape #TensorShape([1, 2, 2, 1]) #When the height and width of the input are 5x5 or 6x6, the output is the same 2x2
x = tf.random.normal([1,5,5,1])
# 6x6 input is convoluted
out = tf.nn.conv2d(x,w, strides=2, padding='VALID') 
out.shape #TensorShape([1, 2, 2, 1]) #When the height and width of the input are 5x5 or 6x6, the output is the same 2x2
 
#Just specify the output size, TensorFlow will automatically deduce the number of rows and columns to be filled
xx = tf.nn.conv2d_transpose(out, w, strides=2, padding='VALID', output_shape=[1,6,6,1])
xx.shape #TensorShape([1, 6, 6, 1])
xx = tf.nn.conv2d_transpose(out, w, strides=2, padding='VALID', output_shape=[1,5,5,1])
xx.shape #TensorShape([1, 5, 5, 1])

# Create X matrix
x = tf.range(25)+1
# Reshape is the tensor of legal dimension
x = tf.reshape(x,[1,5,5,1])
x = tf.cast(x, tf.float32)
# Creating convolution kernel matrix with fixed content
w = tf.constant([[-1,2,-3.],[4,-5,6],[-7,8,-9]])
# Tensor adjusted to legal dimension
w = tf.expand_dims(w,axis=2)
w = tf.expand_dims(w,axis=3)
# Carry out general convolution operation
out = tf.nn.conv2d(x,w,strides=2,padding='VALID')
out
 
# The output of general convolution is used as the input of transposition convolution to perform transposition convolution operation
xx = tf.nn.conv2d_transpose(out, w, strides=2, padding='VALID', output_shape=[1,5,5,1])
#<tf.Tensor: id=117, shape=(5, 5), dtype=float32, numpy=
#array([[   67.,  -134.,   278.,  -154.,   231.],
#       [ -268.,   335.,  -710.,   385.,  -462.],
#       [  586.,  -770.,  1620.,  -870.,  1074.],
#       [ -468.,   585., -1210.,   635.,  -762.],
#       [  819.,  -936.,  1942., -1016.,  1143.]], dtype=float32)>


x = tf.random.normal([1,6,6,1])
# 6x6 input is convoluted
out = tf.nn.conv2d(x,w,strides=2,padding='VALID')
out
#<tf.Tensor: id=21, shape=(1, 2, 2, 1), dtype=float32, numpy=
#array([[[[ 20.438847 ],
#         [ 19.160788 ]],
#        [[  0.8098897],
#         [-28.30303  ]]]], dtype=float32)>

# Restore 6x6 size
xx = tf.nn.conv2d_transpose(out, w, strides=2, padding='VALID', output_shape=[1,6,6,1])
xx

# Create a transpose convolution class
layer = layers.Conv2DTranspose(1,kernel_size=3,strides=2,padding='VALID')
xx2 = layer(out)
xx2

Keras depth separable convolution

Depth separable convolution

Inception

import  os
import  tensorflow as tf
import  numpy as np
from    tensorflow import keras
 
tf.random.set_seed(22)
np.random.seed(22)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')
 
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train, x_test = x_train.astype(np.float32)/255., x_test.astype(np.float32)/255.
# [b, 28, 28] => [b, 28, 28, 1]
x_train, x_test = np.expand_dims(x_train, axis=3), np.expand_dims(x_test, axis=3)
db_train = tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(256)
db_test = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(256)
print(x_train.shape, y_train.shape) #(60000, 28, 28, 1) (60000,)
print(x_test.shape, y_test.shape) #(10000, 28, 28, 1) (10000,)
 
class ConvBNRelu(keras.Model):
    def __init__(self, ch, kernelsz=3, strides=1, padding='same'):
        super(ConvBNRelu, self).__init__()
        self.model = keras.models.Sequential([
            keras.layers.Conv2D(ch, kernelsz, strides=strides, padding=padding),
            keras.layers.BatchNormalization(),
            keras.layers.ReLU()
        ])

    def call(self, x, training=None):
        x = self.model(x, training=training)
        return x 

class InceptionBlk(keras.Model):
    def __init__(self, ch, strides=1):
        super(InceptionBlk, self).__init__()
        self.ch = ch
        self.strides = strides
        self.conv1 = ConvBNRelu(ch, strides=strides)
        self.conv2 = ConvBNRelu(ch, kernelsz=3, strides=strides)
        self.conv3_1 = ConvBNRelu(ch, kernelsz=3, strides=strides)
        self.conv3_2 = ConvBNRelu(ch, kernelsz=3, strides=1)
        self.pool = keras.layers.MaxPooling2D(3, strides=1, padding='same')
        self.pool_conv = ConvBNRelu(ch, strides=strides)

    def call(self, x, training=None):
        x1 = self.conv1(x, training=training)
        x2 = self.conv2(x, training=training)
        x3_1 = self.conv3_1(x, training=training)
        x3_2 = self.conv3_2(x3_1, training=training)
        x4 = self.pool(x)
        x4 = self.pool_conv(x4, training=training)
        # concat along axis=channel only connects on channel dimension
        x = tf.concat([x1, x2, x3_2, x4], axis=3)
        return x

class Inception(keras.Model):
    def __init__(self, num_layers, num_classes, init_ch=16, **kwargs):
        super(Inception, self).__init__(**kwargs)
        self.in_channels = init_ch
        self.out_channels = init_ch #After passing through each num layers layer, the number of out channels output channels will be doubled, and the height and width of the output will also be halved
        self.num_layers = num_layers
        self.init_ch = init_ch
        self.conv1 = ConvBNRelu(init_ch)
        self.blocks = keras.models.Sequential(name='dynamic-blocks')

        for block_id in range(num_layers):
            for layer_id in range(2):
                if layer_id == 0:
                    #After each num layers layer, the height and width of the output of the first layer are halved
                    block = InceptionBlk(self.out_channels, strides=2) #Half width and height of output
                else:
                    block = InceptionBlk(self.out_channels, strides=1)
                self.blocks.add(block)
            # enlarger out_channels per block    
            self.out_channels *= 2 #After each num layers layer, the number of out channels output channels will double

        self.avg_pool = keras.layers.GlobalAveragePooling2D()
        self.fc = keras.layers.Dense(num_classes)

    def call(self, x, training=None):
        out = self.conv1(x, training=training)
        out = self.blocks(out, training=training)
        out = self.avg_pool(out)
        out = self.fc(out)
        return out    

# build model and optimizer
batch_size = 32
epochs = 100
model = Inception(2, 10)
# derive input shape for every layers.
model.build(input_shape=(None, 28, 28, 1))
model.summary()
#Model: "inception"
#_________________________________________________________________
#Layer (type)                 Output Shape              Param #
#=================================================================
#conv_bn_relu (ConvBNRelu)    multiple                  224
#_________________________________________________________________
#dynamic-blocks (Sequential)  multiple                  292704
#_________________________________________________________________
#global_average_pooling2d (Gl multiple                  0
#_________________________________________________________________
#dense (Dense)                multiple                  1290
#=================================================================
#Total params: 294,218
#Trainable params: 293,226
#Non-trainable params: 992
#_________________________________________________________________

optimizer = keras.optimizers.Adam(learning_rate=1e-3)
criteon = keras.losses.CategoricalCrossentropy(from_logits=True)
acc_meter = keras.metrics.Accuracy()

for epoch in range(100):
    for step, (x, y) in enumerate(db_train):
        with tf.GradientTape() as tape:
            # print(x.shape, y.shape)
            # [b, 10]
            logits = model(x)
            # [b] vs [b, 10]
            loss = criteon(tf.one_hot(y, depth=10), logits)
        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))
        if step % 10 == 0:
            print(epoch, step, 'loss:', loss.numpy())
    acc_meter.reset_states() #Clear accuracy index
    for x, y in db_test:
        # [b, 10]
        logits = model(x, training=False)
        # [b, 10] => [b]
        pred = tf.argmax(logits, axis=1)
        print(x.shape, y.shape, pred.shape) #(256, 28, 28, 1) (256,) (256,)
        # Comparative calculation of y's batch [b] and pred's batch [b]
        acc_meter.update_state(y, pred) #Calculation accuracy index
    print(epoch, 'evaluation acc:', acc_meter.result().numpy()) #Get accuracy index

 

230 original articles published, praised by 111, visited 160000+
Private letter follow

Keywords: network Lambda Python

Added by graham23s on Sun, 26 Jan 2020 11:25:32 +0200