之前已经介绍过数据的加载和可视化的问题,这次该研究模型了。CNN网络应该是深度学习的一个经典了,话不多说,直接介绍核心模块。
权重初始化
def weight_variable(shape):
initial = tf.truncated_normal(shape, dtype=tf.float32, stddev=0.1)
return tf.Variable(initial)
def bias_variable(shape):
initial = tf.constant(0.1, shape=shape)
return tf.Variable(initial, trainable=True)
weights = {
'wc1': weight_variable([3, 3, 1, 64]),
'wc2': weight_variable([3, 3, 64, 128]),
'wd1': weight_variable([4*4*12, 1024]),
'wd2': weight_variable([1024, 1024]),
'out': weight_variable([1024, 10])
}
biases = {
'bc1': bias_variable([64]),
'bc2': bias_variable([128]),
'bd1': bias_variable([1024]),
'bd2': bias_variable([1024]),
'out': bias_variable([n_classes])
}
卷积层
这里的卷积参数使用1步长(stride size),0边距(padding size)的模板,保证输出和输入是同一个大小。
def conv2d(x, W, B, name):
with tf.name_scope(name) as scope:
conv = tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')
bias = tf.nn.bias_add(conv, B)
conv = tf.nn.relu(bias, name=scope)
return conv
conv1 = conv2d(images, weights['wc1'], biases['bc1'], 'conv1')
tensorflow里的padding参数支持SAME
和VALID
两种模式,设输入矩阵大小为W*W
,卷积滤波器大小F*F
,步长stride大小为S
,则
# padding = 'VALID'
new_height = new_width = (W – F + 1) / S (结果向上取整)
# padding = 'SAME'
pad_needed_height = (new_height – 1) × S + F - W (结果向上取整,并在四周添加像素)
当new_height为奇数的时候,在顶部贴两个像素,在底部贴3个像素,而caffe和cuDNN是在两边各贴两个像素。这点不同,导致用caffe训练的模型转tensorflow的时候要慎用‘SAME’模式,切记!
池化
池化分为均值池化tf.nn.avg_pool
和max池化tf.nn.max_pool
。这里的池化用简单传统的2x2大小的模板做max pooling。
def max_pool(x, k, name):
return tf.nn.max_pool(x, ksize=[1, k, k, 1], strides=[1, 2, 2, 1], padding='SAME', name=name)
def avg_pool(x, k, name):
return tf.nn.avg_pool(x, ksize=[1, k, k, 1], strides=[1, 2, 2, 1], padding='SAME', name=name)
pool1 = max_pool(conv1, k=2, 'pool1')
归一化
def norm(x, lsize, name):
return tf.nn.lrn(x, lsize, bias=1.0, alpha=0.001 / 9.0, beta=0.75, name=name)
norm1 = norm(pool1, lsize=4, 'norm1')
全连接层
现在加一个1024维的全连接层,在池化层和全连接层连接处要将池化层输出的张量reshape成一向量
pool2_flat = tf.reshape(pool2, [-1, weights['wd1'].get_shape().as_list()[0]])
乘上权重矩阵,加上偏置,然后对其使用ReLU。
fc1 = tf.nn.relu(tf.matmul(pool2_flat, weights['wd1']) + biases['bd1'])
Dropout
用一个placeholder来代表一个神经元的输出在dropout中保持不变的概率。这样我们可以在训练过程中启用dropout,在测试过程中关闭dropout。 tensorflow学习(四)创建自己的CNN网络及可视化tf.nn.dropout
操作除了可以屏蔽神经元的输出外,还会自动处理神经元输出值的scale。所以用dropout的时候可以不用考虑scale。
keep_prob = tf.placeholder("float")
fc1_drop = tf.nn.dropout(fc1, keep_prob)
Batch Normalization
《Batch Normalization: Accelerating Deep Network Training by Reducing Internal Covariate Shift》文章中的batch normalization可以提高训练的收敛速度,甚至可以不使用dropout和L2正则就能取得很好的泛化能力,参考中文资料http://blog.csdn.net/happynear/article/details/44238541。具体需要计算每个batch在该层所有特征图的均值和标准差,然后将上一层的输出归一化之后再送入下一层。
具体在tensorflow中,用tf.nn.moments
求出axes对应的特征图维度的均值mean
和标准差variance
,然后用tf.nn.batch_normalization
进行归一化,其中offset
一般初始化为0,scale
初始化为1,另外offset、scale的shape与mean相同,variance_epsilon
这个参数设为一个很小的数就行,比如0.001。
需要强调一点的是,BN在神经网络进行training和testing的时候,所用的mean、variance是不一样的!以上的batch_normalization在训练的时候有效,测试阶段,只有一个样本输入,这时候网络参数固定,用之前训练好的均值和标准差作为参数传入即可。类似图片样本测试时候需要先减去均值,而这个均值文件是由训练样本生成的,差不多的道理。有一个不错的代码实现http://r2rt.com/implementing-batch-normalization-in-tensorflow.html
def batch_norm_wrapper(inputs, is_training, decay = 0.999):
scale = tf.Variable(tf.ones([inputs.get_shape()[-1]]))
beta = tf.Variable(tf.zeros([inputs.get_shape()[-1]]))
pop_mean = tf.Variable(tf.zeros([inputs.get_shape()[-1]]), trainable=False)
pop_var = tf.Variable(tf.ones([inputs.get_shape()[-1]]), trainable=False)
if is_training:
batch_mean, batch_var = tf.nn.moments(inputs,[0])
train_mean = tf.assign(pop_mean, pop_mean * decay + batch_mean * (1 - decay))
train_var = tf.assign(pop_var, pop_var * decay + batch_var * (1 - decay))
with tf.control_dependencies([train_mean, train_var]):
return tf.nn.batch_normalization(inputs, batch_mean, batch_var, beta, scale, epsilon)
else:
return tf.nn.batch_normalization(inputs, pop_mean, pop_var, beta, scale, epsilon)
bn_z1 = batch_norm_wrapper(z1, is_training)
输出层
普通输出层
out = tf.matmul(fc2_drop, _weights['out']) + _biases['out']
使用softmax层
out = tf.nn.softmax(tf.matmul(fc2_drop, weights['out']) + biases['out'])
训练
定义损失函数,这里要注意在输出层是否使用了softmax,这里不要重复使用。这里使用的损失函数是真实值y与预测值y_pred之间的交叉熵。注意tf.reduce_sum
是把minibatch里的每张图的交叉熵都加进来的。
y_pred = tf.nn.softmax(pred)
cost = -tf.reduce_sum(y * tf.log(y_pred))
或者调用系统函数更为方便,如下
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(pred, y))
tensorflow有大量的微分迭代优化算法,来对交叉熵进行梯度下降。
optimizer = tf.train.GradientDescentOptimizer(learning_rate=learning_rate).minimize(cost)
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(cost)
模型训练通过反复执行optimizer来更新参数:
with tf.Session() as sess:
sess.run(init)
for i in range(1000):
batch = mnist.train.next_batch(50)
sess.run(optimizer, feed_dict={x: batch[0], y: batch[1], keep_prob: dropout})
tensorflow的Session
和InteractiveSession
方式不同。前者是构建完图对象后,通过run()整体执行全部操作,并通过close()释放资源。后者更为方便的进行交互,使用Tensor.eval()
和Operation.run()
方法代替Session.run()
。在DQN这类算法中由于每次迭代都需要交互,所以使用InteractiveSession
。
评估模型
通过tf.argmax
函数获得输出层中的最大值的索引位置,并将所有的测试样本的结果取平均值得到测试集上的准确率。注意在测试集中dropout参数设置为1,也就是没有dropout。
correct_pred = tf.equal(tf.argmax(pred,1), tf.argmax(y,1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
acc = sess.run(accuracy, feed_dict={x: batch[0], y: batch[1], keep_prob: 1.})
MyNet网络完整训练代码
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets('MNIST_data', one_hot=True)
import tensorflow as tf
learning_rate = 0.001
training_iters = 200000
batch_size = 64
display_step = 20
n_input = 784
n_classes = 10
dropout = 0.8
x = tf.placeholder(tf.float32, [None, n_input])
y = tf.placeholder(tf.float32, [None, n_classes])
keep_prob = tf.placeholder(tf.float32)
def weight_variable(shape):
initial = tf.truncated_normal(shape, dtype=tf.float32, stddev=0.1)
return tf.Variable(initial)
def bias_variable(shape):
initial = tf.constant(0.1, shape=shape)
return tf.Variable(initial, trainable=True)
def conv2d(x, W, B, name):
with tf.name_scope(name) as scope:
conv = tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')
bias = tf.nn.bias_add(conv, B)
conv = tf.nn.relu(bias, name=scope)
return conv
def max_pool(x, k, name):
return tf.nn.max_pool(x, ksize=[1, k, k, 1], strides=[1, 2, 2, 1], padding='SAME', name=name)
def avg_pool(x, k, name):
return tf.nn.avg_pool(x, ksize=[1, k, k, 1], strides=[1, 2, 2, 1], padding='SAME', name=name)
def norm(x, lsize, name):
return tf.nn.lrn(x, lsize, bias=1.0, alpha=0.001 / 9.0, beta=0.75, name=name)
def my_net(_x, _weights, _biases, _dropout):
_x = tf.reshape(_x, shape=[-1, 28, 28, 1])
conv1 = conv2d(_x, _weights['wc1'], _biases['bc1'], 'conv1')
pool1 = max_pool(conv1, k=2, name='pool1')
norm1 = norm(pool1, lsize=4, name='norm1')
conv2 = conv2d(norm1, _weights['wc2'], _biases['bc2'], 'conv2')
pool2 = max_pool(conv2, k=2, name='pool2')
norm2 = norm(pool2, lsize=4, name='norm2')
pool2_flat = tf.reshape(norm2, [-1, _weights['wd1'].get_shape().as_list()[0]])
fc1 = tf.nn.relu(tf.matmul(pool2_flat, _weights['wd1']) + _biases['bd1'])
fc1_drop = tf.nn.dropout(fc1, _dropout)
fc2 = tf.nn.relu(tf.matmul(fc1_drop, _weights['wd2']) + _biases['bd2'])
fc2_drop = tf.nn.dropout(fc2, _dropout)
out = tf.matmul(fc2_drop, _weights['out']) + _biases['out']
return out
weights = {
'wc1': weight_variable([3, 3, 1, 64]),
'wc2': weight_variable([3, 3, 64, 128]),
'wd1': weight_variable([7*7*128, 1024]),
'wd2': weight_variable([1024, 1024]),
'out': weight_variable([1024, 10])
}
biases = {
'bc1': bias_variable([64]),
'bc2': bias_variable([128]),
'bd1': bias_variable([1024]),
'bd2': bias_variable([1024]),
'out': bias_variable([n_classes])
}
pred = my_net(x, weights, biases, keep_prob)
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(pred, y))
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(cost)
correct_pred = tf.equal(tf.argmax(pred,1), tf.argmax(y,1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
init = tf.initialize_all_variables()
with tf.Session() as sess:
sess.run(init)
step = 1
while step * batch_size < training_iters:
batch = mnist.train.next_batch(batch_size)
sess.run(optimizer, feed_dict={x: batch[0], y: batch[1], keep_prob: dropout})
if step % display_step == 0:
acc = sess.run(accuracy, feed_dict={x: batch[0], y: batch[1], keep_prob: 1.})
loss = sess.run(cost, feed_dict={x: batch[0], y: batch[1], keep_prob: 1.})
print "Iter " + str(step*batch_size) + ", Minibatch Loss= " + "{:.6f}".format(loss) + ", Training Accuracy= " + "{:.5f}".format(acc)
step += 1
print "Optimization Finished!"
print "Testing Accuracy:", sess.run(accuracy, feed_dict={x: mnist.test.images[:256], y: mnist.test.labels[:256], keep_prob: 1.})