使用SVM识别手写数字

调用第三方库实现

在此我选用的是sk-learn的关于svm的库,其关于此次实验的svm函数定义为:svm.SVC(C=8.0, kernel='rbf', gamma=0.1)svm.SVC()函数的几个重要超参在其官方的介绍文档中有如下的解释:

  • C :误差项的惩罚参数,浮点型,可选 (默认=1.0);
  • kernel : 指定核函数类型,字符型,可选 (默认=‘rbf’),如果使用自定义的核函数,需要预先计算核矩阵;
  • gamma : 浮点型, 可选 (默认=0.0),’rbf’核函数的系数,需要注意的是,此处的gamma与课本中的sigma是互为倒数的关系(所以其可以为0)。

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Dec 1 13:30:21 2016
@author: kuangmeng
使用SVM分类器,从MNIST数据集中进行手写数字识别的分类程序
"""
import cPickle
import gzip
from sklearn import svm
import time
def load_data():
"""
返回包含训练数据、验证数据、测试数据的元组的模式识别数据
"""
f = gzip.open('data.gz', 'rb')
training_data, validation_data, test_data = cPickle.load(f)
f.close()
return (training_data, validation_data, test_data)
def Svm():
print ("开始时间:",time.strftime('%Y-%m-%d %H:%M:%S'))
training_data, validation_data, test_data = load_data()
# 传递训练模型的参数,这里用默认的参数
clf = svm.SVC(C=10.0, kernel='rbf', gamma=0.10,cache_size=8000,probability=False)
# 进行模型训练
clf.fit(training_data[0], training_data[1])
# 测试集测试预测结果
predictions = [int(a) for a in clf.predict(test_data[0])]
num_correct = sum(int(a == y) for a, y in zip(predictions, test_data[1]))
print ("%s 中的 %s 测试正确。" % (num_correct, len(test_data[1])))
print ("结束时间:",time.strftime('%Y-%m-%d %H:%M:%S'))
if __name__ == "__main__":
Svm()

自己编程实现

在自己编程实现过程中,我也借鉴了很多其他人写的很成熟的方案,最终从数据结构、逻辑结构以及特征计算等方面得到比较合理的一组答案。

逻辑结构

本次实验的程序分为“训练”和“测试”两部分,两部分分别进行的工作如下:

训练

  • 加载数据
  • 初始化模型
  • 更新标签
  • 初始化预测误差
  • 迭代每个样本(用KT优化)
  • 得到每个样本的模型

对步骤5的解释:

对于svm我们要求解a(数组),如果 a的所有分量满足svm对偶问题的KKT条件,那么这个问题的解就求出来了,我们svm模型学习也就完成了。如果没有满足KKT,那么我们就在 a中找两个分量 ai和 aj,其中 ai 是违反KKT条件最严重的分量,通过计算,使得 ai 和 aj满足KKT条件,直到a的所有分量都满足KKT条件。而且这个计算过程是收敛的,因为每次计算出来的新的两个分量,使得对偶问题中要优化的目标函数值更小。因为每次求解的那两个分量,是要优化问题在这两个分量上的极小值,所以每一次优化,都会使目标函数比上一次的优化结果的值变小。

测试

  • 加载数据
  • 对每个数据预测
  • 计算正确率与相关信息逻辑结构

特征计算

仿照KKT的优化方法,在本次试验中,我将每张图片作为一个数据。由此得到对每一个测试样本的预测(如果在某个分类的计算时结果为正,则说明该测试样本属于该类别,结果为0则不属于此类别)。

其他杂项

  • 核函数选择:按照传统,选择的是RBF核函数,函数形式与教材完全相同;

  • 数据来源:MNIST(http://yann.lecun.com/exdb/mnist/)

  • SMO优化算法:

    • 取初始值a(0)=0,令K=0;
    • 选取优化变量a1(k) , a2(k) , 针对优化问题,求得最优解 a1(k+1) , a2(k+1) 更新 a(k) 为 a(k+1) ;
    • 在精度条件范围内是否满足停机条件,即是否有变量违反KKT条件,如果违反了,则令k=k+1,跳转2,否则4;
    • 求得近似解â =a(k+1)

    其中第3步中,是否违反KKT条件,对于a(k)的每个分量按照以下的违反KKT条件的公式进行验算即可。变量选取分为两步,第一步是选取违反KKT条件最严重的ai,第二步是根据已经选取的第一个变量,选择优化程度最大的第二个变量。违反KKT条件最严重的变量可以按照这样的规则选取,首先看0<ai<C的那些分量中,是否有违反KKT条件的,如果有,则选取yig(xi)最小的那个做为a1。如果没有则遍历所有的样本点,在违反KKT条件的分量中选取yig(xi)最小的做为a1。当选择了a1后,如果a1对应的E1为正,选择Ei最小的那个分量最为a2,如果E1为负,选择Ei最大的那个分量最为a2,这是因为anew2依赖于|E1−E2|。 如果选择的a2,不能满足下降的最小步长,那么就遍历所有的支持向量点做为a2进行试用,如果仍然都不能满足下降的最小步长,那么就遍历所有的样本点做为a2试用。如果还算是不能满足下降的最小步长,那么就重新选择a1。

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Dec 4 14:24:53 2016
@author: kuangmeng
"""
import time
import os
import math
class model:
def __init__(self):
self.a = []
self.b = 0.0
class DATA:
def __init__(self):
self.samples = [] # 样本数据
self.tests = [] # 测试数据
self.models = [] # 训练的模型
self.forecasterror = [] # 预测知与真实y之差Ei
self.modelnum = 0 # 当前正使用或训练的模型
self.cache= [] # 缓存kernel函数的计算结果
self.sigma = 10 # sigma
def init_models(self):
for i in range(0, 10):
m = model()
for j in range(len(self.samples)):
m.a.append(0)
self.models.append(m)
def init_cache(self):
i = 0
for x in self.samples:
print ("正在计算第",i+1,"个样本的RBF核")
self.cache.append([])
j = 0
for z in self.samples:
if i > j:
self.cache[i].append(self.cache[j][i])
else:
self.cache[i].append(RBF(x,z))
j += 1
i += 1
class image:
def __init__(self):
self.data = []
self.num = 0
self.label = []
self.filename = ""
gv = DATA()
# RBF核函数
def RBF(j, i):
if j == i:
return math.exp(0)
sigma = gv.sigma
ret = 0.0
for m in range(len(j.data)):
for n in range(len(j.data[m])):
ret += math.pow(int(j.data[m][n]) - int(i.data[m][n]), 2)
ret = math.exp(-ret/sigma)
return ret
#加载测试与训练数据
def loaddata(dirpath, name):
files = os.listdir(dirpath)
for file in files:
img = image()
img.data = images(dirpath + file)
img.num = int(file[0])
img.filename = file
name.append(img)
#图片分列
def images(path):
img = []
file = open(path, "r")
for line in file:
line = line[:-2]
img.append(line)
return img
#更新样本标签,正在训练啥就将啥的标签定为1,其他的定为-1
def update_samples_label(num):
for img in gv.samples:
if img.num == num:
img.label.append(1)
else:
img.label.append(-1)
#初始化DATA.forecasterror
def init_forecasterror():
gv.forecasterror = []
for i in range(len(gv.samples)):
diff = 0.0
for j in range(len(gv.samples)):
if gv.models[gv.modelnum].a[j] != 0:
diff += gv.models[gv.modelnum].a[j] * gv.samples[j].label[gv.modelnum] * gv.cache[j][i]
diff += gv.models[gv.modelnum].b
diff -= gv.samples[i].label[gv.modelnum]
gv.forecasterror.append(diff)
#更新DATA.forecasterror
def update_forecasterror(i, new_ai, j, new_bj, new_b):
for idx in range(len(gv.samples)):
diff = (new_ai - gv.models[gv.modelnum].a[i])* gv.samples[i].label[gv.modelnum] * gv.cache[i][idx]
diff += (new_bj - gv.models[gv.modelnum].a[j])* gv.samples[j].label[gv.modelnum] * gv.cache[j][idx]
diff += new_b - gv.models[gv.modelnum].b
diff += gv.forecasterror[idx]
gv.forecasterror[idx] = diff
# g(x)
def predict(m):
pred = 0.0
for j in range(len(gv.samples)):
if gv.models[gv.modelnum].a[j] != 0:
pred += gv.models[gv.modelnum].a[j] * gv.samples[j].label[gv.modelnum] * RBF(gv.samples[j],m)
pred += gv.models[gv.modelnum].b
return pred
def save_models():
for i in range(10):
fn = open("models/" + str(i) + "_a.model", "w")
for ai in gv.models[i].a:
fn.write(str(ai))
fn.write('\n')
fn.close()
fn = open("models/" + str(i) + "_b.model", "w")
fn.write(str(gv.models[i].b))
fn.close()
def load_models():
for i in range(10):
fn = open("models/" + str(i) + "_a.model", "r")
j = 0
for line in fn:
gv.models[i].a[j] = float(line)
j += 1
fn.close()
fn = open("models/" + str(i) + "_b.model", "r")
gv.models[i].b = float(fn.readline())
fn.close()
###
# T: tolerance 误差容忍度(精度)
# times: 迭代次数
# 优化方法:SMO
# C: 惩罚系数
# modelnum: 模型序号09
# step: aj移动的最小步长
###
def train(T, times, C, modelnum, step):
time = 0
gv.modelnum = modelnum
update_samples_label(modelnum)
init_forecasterror()
updated = True
while time < times and updated:
updated = False
time += 1
for i in range(len(gv.samples)):
ai = gv.models[gv.modelnum].a[i]
Ei = gv.forecasterror[i]
#计算违背KKT的点
if (gv.samples[i].label[gv.modelnum] * Ei < -T and ai < C) or (gv.samples[i].label[gv.modelnum] * Ei > T and ai > 0):
for j in range(len(gv.samples)):
if j == i: continue
kii = gv.cache[i][i]
kjj = gv.cache[j][j]
kji = kij = gv.cache[i][j]
eta = kii + kjj - 2 * kij
if eta <= 0: continue
new_aj = gv.models[gv.modelnum].a[j] + gv.samples[j].label[gv.modelnum] * (gv.forecasterror[i] - gv.forecasterror[j]) / eta # f 7.106
L = 0.0
H = 0.0
a1_old = gv.models[gv.modelnum].a[i]
a2_old = gv.models[gv.modelnum].a[j]
if gv.samples[i].label[gv.modelnum] == gv.samples[j].label[gv.modelnum]:
L = max(0, a2_old + a1_old - C)
H = min(C, a2_old + a1_old)
else:
L = max(0, a2_old - a1_old)
H = min(C, C + a2_old - a1_old)
if new_aj > H:
new_aj = H
if new_aj < L:
new_aj = L
if abs(a2_old - new_aj) < step:
# print ("j = %d, is not moving enough" % j)
continue
new_ai = a1_old + gv.samples[i].label[gv.modelnum] * gv.samples[j].label[gv.modelnum] * (a2_old - new_aj) # f 7.109
new_b1 = gv.models[gv.modelnum].b - gv.forecasterror[i] - gv.samples[i].label[gv.modelnum] * kii * (new_ai - a1_old) - gv.samples[j].label[gv.modelnum] * kji * (new_aj - a2_old) # f7.115
new_b2 = gv.models[gv.modelnum].b - gv.forecasterror[j] - gv.samples[i].label[gv.modelnum]*kji*(new_ai - a1_old) - gv.samples[j].label[gv.modelnum]*kjj*(new_aj-a2_old) # f7.116
if new_ai > 0 and new_ai < C: new_b = new_b1
elif new_aj > 0 and new_aj < C: new_b = new_b2
else: new_b = (new_b1 + new_b2) / 2.0
update_forecasterror(i, new_ai, j, new_aj, new_b)
gv.models[gv.modelnum].a[i] = new_ai
gv.models[gv.modelnum].a[j] = new_aj
gv.models[gv.modelnum].b = new_b
updated = True
print ("迭代次数: %d, 修改组合: i: %d 与 j:%d" %(time, i, j))
break
# 测试数据
def test():
record = 0
record_correct = 0
for img in gv.tests:
print ("正在测试:", img.filename)
for modelnum in range(10):
gv.modelnum = modelnum
if predict(img) > 0:
print ("测试结果:",modelnum)
record += 1
if modelnum == int(img.filename[0]):
record_correct += 1
break
print ("相关记录数量:", record)
print ("正确识别数量:", record_correct)
print ("正确识别比例:", record_correct/record)
print ("测试数据总量:", len(gv.tests))
if __name__ == "__main__":
print ("开始时间:",time.strftime('%Y-%m-%d %H:%M:%S'))
training = True
loaddata("train/", gv.samples)
loaddata("test/", gv.tests)
print ("训练数据个数:",len(gv.samples))
print ("测试数据个数:",len(gv.tests))
if training == True:
gv.init_cache()
gv.init_models()
print ("模型初始化成功!")
print ("当前时间:",time.strftime('%Y-%m-%d %H:%M:%S'))
T = 0.0001
C = 10
step = 0.001
gv.sigma = 1
if training == True:
for i in range(10):
print ("正在训练模型:", i)
train(T, 10, C, i, step)
save_models()
else:
load_models()
for i in range(10):
update_samples_label(i)
print ("训练完成时间:",time.strftime('%Y-%m-%d %H:%M:%S'))
test()
print ("测试完成时间:",time.strftime('%Y-%m-%d %H:%M:%S'))



The link of this page is https://blog.nooa.tech/articles/7dac38a/ . Welcome to reproduce it!

© 2018.02.08 - 2024.05.25 Mengmeng Kuang  保留所有权利!

:D 获取中...

Creative Commons License