tensorflow学习(四)自己创建CNN网络

之前已经介绍过数据的加载和可视化的问题,这次该研究模型了。CNN网络应该是深度学习的一个经典了,话不多说,直接介绍核心模块。

权重初始化

def weight_variable(shape):
    initial = tf.truncated_normal(shape, dtype=tf.float32, stddev=0.1)
    return tf.Variable(initial)

def bias_variable(shape):
    initial = tf.constant(0.1, shape=shape)
    return tf.Variable(initial, trainable=True)

weights = {
    'wc1': weight_variable([3, 3, 1, 64]),
    'wc2': weight_variable([3, 3, 64, 128]),
    'wd1': weight_variable([4*4*12, 1024]),
    'wd2': weight_variable([1024, 1024]),
    'out': weight_variable([1024, 10])
}
biases = {
    'bc1': bias_variable([64]),
    'bc2': bias_variable([128]),
    'bd1': bias_variable([1024]),
    'bd2': bias_variable([1024]),
    'out': bias_variable([n_classes])
}

卷积层

这里的卷积参数使用1步长(stride size),0边距(padding size)的模板,保证输出和输入是同一个大小。

def conv2d(x, W, B, name):
    with tf.name_scope(name) as scope:
        conv = tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')
        bias = tf.nn.bias_add(conv, B)
        conv = tf.nn.relu(bias, name=scope)
        return conv

conv1 = conv2d(images, weights['wc1'], biases['bc1'], 'conv1')

tensorflow里的padding参数支持SAMEVALID两种模式,设输入矩阵大小为W*W,卷积滤波器大小F*F,步长stride大小为S,则

# padding = 'VALID'
new_height = new_width = (W – F + 1) / S (结果向上取整)
# padding = 'SAME'
pad_needed_height = (new_height – 1) × S + F - W (结果向上取整,并在四周添加像素)

当new_height为奇数的时候,在顶部贴两个像素,在底部贴3个像素,而caffe和cuDNN是在两边各贴两个像素。这点不同,导致用caffe训练的模型转tensorflow的时候要慎用‘SAME’模式,切记!

池化

池化分为均值池化tf.nn.avg_pool和max池化tf.nn.max_pool。这里的池化用简单传统的2x2大小的模板做max pooling。

def max_pool(x, k, name):
    return tf.nn.max_pool(x, ksize=[1, k, k, 1], strides=[1, 2, 2, 1], padding='SAME', name=name)

def avg_pool(x, k, name):
    return tf.nn.avg_pool(x, ksize=[1, k, k, 1], strides=[1, 2, 2, 1], padding='SAME', name=name)

pool1 = max_pool(conv1, k=2, 'pool1')

归一化

def norm(x, lsize, name):
    return tf.nn.lrn(x, lsize, bias=1.0, alpha=0.001 / 9.0, beta=0.75, name=name)

norm1 = norm(pool1, lsize=4, 'norm1')

全连接层

现在加一个1024维的全连接层,在池化层和全连接层连接处要将池化层输出的张量reshape成一向量

pool2_flat = tf.reshape(pool2, [-1, weights['wd1'].get_shape().as_list()[0]])

乘上权重矩阵,加上偏置,然后对其使用ReLU。

fc1 = tf.nn.relu(tf.matmul(pool2_flat, weights['wd1']) + biases['bd1'])

Dropout

用一个placeholder来代表一个神经元的输出在dropout中保持不变的概率。这样我们可以在训练过程中启用dropout,在测试过程中关闭dropout。 tensorflow学习(四)创建自己的CNN网络及可视化tf.nn.dropout操作除了可以屏蔽神经元的输出外,还会自动处理神经元输出值的scale。所以用dropout的时候可以不用考虑scale。

keep_prob = tf.placeholder("float")
fc1_drop = tf.nn.dropout(fc1, keep_prob)

Batch Normalization

《Batch Normalization: Accelerating Deep Network Training by Reducing Internal Covariate Shift》文章中的batch normalization可以提高训练的收敛速度,甚至可以不使用dropout和L2正则就能取得很好的泛化能力,参考中文资料http://blog.csdn.net/happynear/article/details/44238541。具体需要计算每个batch在该层所有特征图的均值和标准差,然后将上一层的输出归一化之后再送入下一层。

具体在tensorflow中,用tf.nn.moments求出axes对应的特征图维度的均值mean和标准差variance,然后用tf.nn.batch_normalization进行归一化,其中offset一般初始化为0,scale初始化为1,另外offset、scale的shape与mean相同,variance_epsilon这个参数设为一个很小的数就行,比如0.001。
需要强调一点的是,BN在神经网络进行training和testing的时候,所用的mean、variance是不一样的!以上的batch_normalization在训练的时候有效,测试阶段,只有一个样本输入,这时候网络参数固定,用之前训练好的均值和标准差作为参数传入即可。类似图片样本测试时候需要先减去均值,而这个均值文件是由训练样本生成的,差不多的道理。有一个不错的代码实现http://r2rt.com/implementing-batch-normalization-in-tensorflow.html

def batch_norm_wrapper(inputs, is_training, decay = 0.999):
    scale = tf.Variable(tf.ones([inputs.get_shape()[-1]]))
    beta = tf.Variable(tf.zeros([inputs.get_shape()[-1]]))
    pop_mean = tf.Variable(tf.zeros([inputs.get_shape()[-1]]), trainable=False)
    pop_var = tf.Variable(tf.ones([inputs.get_shape()[-1]]), trainable=False)

    if is_training:
        batch_mean, batch_var = tf.nn.moments(inputs,[0])
        train_mean = tf.assign(pop_mean, pop_mean * decay + batch_mean * (1 - decay))
        train_var = tf.assign(pop_var, pop_var * decay + batch_var * (1 - decay))
        with tf.control_dependencies([train_mean, train_var]):
            return tf.nn.batch_normalization(inputs, batch_mean, batch_var, beta, scale, epsilon)
    else:
        return tf.nn.batch_normalization(inputs, pop_mean, pop_var, beta, scale, epsilon)

bn_z1 = batch_norm_wrapper(z1, is_training)

输出层

普通输出层

out = tf.matmul(fc2_drop, _weights['out']) + _biases['out']

使用softmax层

out = tf.nn.softmax(tf.matmul(fc2_drop, weights['out']) + biases['out'])

训练

定义损失函数,这里要注意在输出层是否使用了softmax,这里不要重复使用。这里使用的损失函数是真实值y与预测值y_pred之间的交叉熵。注意tf.reduce_sum是把minibatch里的每张图的交叉熵都加进来的。

y_pred = tf.nn.softmax(pred)
cost = -tf.reduce_sum(y * tf.log(y_pred))

或者调用系统函数更为方便,如下

cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(pred, y))

tensorflow有大量的微分迭代优化算法,来对交叉熵进行梯度下降。

optimizer = tf.train.GradientDescentOptimizer(learning_rate=learning_rate).minimize(cost)
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(cost)

模型训练通过反复执行optimizer来更新参数:

with tf.Session() as sess:
    sess.run(init)
    for i in range(1000):
        batch = mnist.train.next_batch(50)
        sess.run(optimizer, feed_dict={x: batch[0], y: batch[1], keep_prob: dropout})

tensorflow的SessionInteractiveSession方式不同。前者是构建完图对象后,通过run()整体执行全部操作,并通过close()释放资源。后者更为方便的进行交互,使用Tensor.eval()Operation.run()方法代替Session.run()。在DQN这类算法中由于每次迭代都需要交互,所以使用InteractiveSession

评估模型

通过tf.argmax函数获得输出层中的最大值的索引位置,并将所有的测试样本的结果取平均值得到测试集上的准确率。注意在测试集中dropout参数设置为1,也就是没有dropout。

correct_pred = tf.equal(tf.argmax(pred,1), tf.argmax(y,1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
acc = sess.run(accuracy, feed_dict={x: batch[0], y: batch[1], keep_prob: 1.})

MyNet网络完整训练代码

from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets('MNIST_data', one_hot=True)

import tensorflow as tf
# Parameters
learning_rate = 0.001
training_iters = 200000
batch_size = 64
display_step = 20

# Network Parameters
n_input = 784 # MNIST data input (img shape: 28*28)
n_classes = 10 # MNIST total classes (0-9 digits)
dropout = 0.8 # Dropout, probability to keep units

# tf Graph input
x = tf.placeholder(tf.float32, [None, n_input])
y = tf.placeholder(tf.float32, [None, n_classes])
keep_prob = tf.placeholder(tf.float32) # dropout (keep probability)

def weight_variable(shape):
    initial = tf.truncated_normal(shape, dtype=tf.float32, stddev=0.1)
    return tf.Variable(initial)

def bias_variable(shape):
    initial = tf.constant(0.1, shape=shape)
    return tf.Variable(initial, trainable=True)

def conv2d(x, W, B, name):
    with tf.name_scope(name) as scope:
        conv = tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')
        bias = tf.nn.bias_add(conv, B)
        conv = tf.nn.relu(bias, name=scope)
        return conv

def max_pool(x, k, name):
    return tf.nn.max_pool(x, ksize=[1, k, k, 1], strides=[1, 2, 2, 1], padding='SAME', name=name)

def avg_pool(x, k, name):
    return tf.nn.avg_pool(x, ksize=[1, k, k, 1], strides=[1, 2, 2, 1], padding='SAME', name=name)

def norm(x, lsize, name):
    return tf.nn.lrn(x, lsize, bias=1.0, alpha=0.001 / 9.0, beta=0.75, name=name)

def my_net(_x, _weights, _biases, _dropout):
    _x = tf.reshape(_x, shape=[-1, 28, 28, 1])

    conv1 = conv2d(_x, _weights['wc1'], _biases['bc1'], 'conv1')
    pool1 = max_pool(conv1, k=2, name='pool1')
    norm1 = norm(pool1, lsize=4, name='norm1')

    conv2 = conv2d(norm1, _weights['wc2'], _biases['bc2'], 'conv2')
    pool2 = max_pool(conv2, k=2, name='pool2')
    norm2 = norm(pool2, lsize=4, name='norm2')

    pool2_flat = tf.reshape(norm2, [-1, _weights['wd1'].get_shape().as_list()[0]])
    fc1 = tf.nn.relu(tf.matmul(pool2_flat, _weights['wd1']) + _biases['bd1'])
    fc1_drop = tf.nn.dropout(fc1, _dropout)

    fc2 = tf.nn.relu(tf.matmul(fc1_drop, _weights['wd2']) + _biases['bd2'])
    fc2_drop = tf.nn.dropout(fc2, _dropout)

    out = tf.matmul(fc2_drop, _weights['out']) + _biases['out']
    return out

weights = {
    'wc1': weight_variable([3, 3, 1, 64]),
    'wc2': weight_variable([3, 3, 64, 128]),
    'wd1': weight_variable([7*7*128, 1024]),
    'wd2': weight_variable([1024, 1024]),
    'out': weight_variable([1024, 10])
}
biases = {
    'bc1': bias_variable([64]),
    'bc2': bias_variable([128]),
    'bd1': bias_variable([1024]),
    'bd2': bias_variable([1024]),
    'out': bias_variable([n_classes])
}

# Construct model
pred = my_net(x, weights, biases, keep_prob)

# Define loss and optimizer
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(pred, y))
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(cost)

# Evaluate model
correct_pred = tf.equal(tf.argmax(pred,1), tf.argmax(y,1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))

# Initializing the variables
init = tf.initialize_all_variables()

# Launch the graph
with tf.Session() as sess:
    sess.run(init)
    step = 1# Keep training until reach max iterations
    while step * batch_size < training_iters:
        batch = mnist.train.next_batch(batch_size)
        # Fit training using batch data
        sess.run(optimizer, feed_dict={x: batch[0], y: batch[1], keep_prob: dropout})
        if step % display_step == 0:
            # Calculate batch accuracy
            acc = sess.run(accuracy, feed_dict={x: batch[0], y: batch[1], keep_prob: 1.})
            # Calculate batch loss
            loss = sess.run(cost, feed_dict={x: batch[0], y: batch[1], keep_prob: 1.})
            print "Iter " + str(step*batch_size) + ", Minibatch Loss= " + "{:.6f}".format(loss) + ", Training Accuracy= " + "{:.5f}".format(acc)
        step += 1
    print "Optimization Finished!"
    # Calculate accuracy for 256 mnist test images
    print "Testing Accuracy:", sess.run(accuracy, feed_dict={x: mnist.test.images[:256], y: mnist.test.labels[:256], keep_prob: 1.})