统计学习方法——简单实现KNN[Python]

李航老师《统计学习方法》第三章笔记。

关于KNN[k-Nearest Neighbors algorithm]的基本知识和实验。

KNN的基本知识

详情参考WIKI, 这里我们暂时先研究分类方面的应用。

算法的描述:

(1)在K近邻法中,当训练集、距离度量[如欧式距离]、K值及分类决策规则[如多数表决]确定后,对于任一新的输入实例,它所属的类唯一地确定。

(2)k值的减小就意味着整体模型变得复杂,容易发生过拟合。K一般取较小的值,通常采用交叉验证法来选取最优的K值。

Python实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87

import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from collections import Counter


# S1-->随机生成训练集并标注


# 模拟对原始数据标注
def sign(Px, D, classx):
for px in Px:
D[tuple(px)] = classx

# 湖区train_data
def get_train_data():
# D 用来储存所有的标注好的实例点
D = {}
P1 = np.random.random((100, 2))
sign(P1, D, 1)

P2 = np.random.random((100, 2)) - 0.7
sign(P2, D, 2)

P3 = np.random.random((100, 2)) + 0.7
sign(P3, D, 3)


plt.plot(P1[:, 0], P1[:, 1], 'ro', label='class1', alpha = 0.7)
plt.plot(P2[:, 0], P2[:, 1], 'go', label='class2', alpha = 0.7)
plt.plot(P3[:, 0], P3[:, 1], 'bo', label='class3', alpha = 0.7)

return D


# S2--> 算法实现
def calc_dist(px, py):
return np.linalg.norm(np.array(px)-np.array(py))

# 按照与待测点距离的大小排序
def get_rank(D, p0):
plt.plot(p0[0], p0[1], 'k*', markersize=15, label="Test Point")
point_dist = {}
for px in D.keys():
point_dist[px] = calc_dist(px, p0)
point_dist = pd.Series(point_dist, index=point_dist.keys())
point_dist.sort_values(inplace=True)

return point_dist

# 获取最近的k个数据点
def get_knn(point_dist, k=1):
knn = point_dist[:k].index
knn_x = [i[0] for i in list(knn)]
knn_y = [i[1] for i in list(knn)]

plt.plot(knn_x, knn_y, 'yo', markersize=5, label="k nearest neighbors")

return knn

# 找到待测点所属的类别
def get_class(knn, D):
class_all = [D[point] for point in knn]
# print(class_all)
c = Counter(class_all)
# 多数表决
classx = c.most_common(1)[0][0]

return classx

# S3--> 运行测试

def run(p0 = (0, 1)):
D = get_train_data()
point_dist = get_rank(D, p0)
knn = get_knn(point_dist, 3)
get_class(knn, D)
class_x = get_class(knn, D)
print(p0, '所属的类别为:', class_x)
plt.legend()
plt.title('raw knn')
plt.show()

if __name__ == '__main__':
run((0, 0.5))

输出:

进一步的应用:

注意:这里有三个版本的实现,前两个只有在计算距离上有所不同,但是很慢[第二个快些也还是要10min+], 最后一个速度很快,也就1min左右,参考这里。

(1)代码way1—>V1.0[大部分继承自上面的方法,速度极其慢]
(2)代码way2—>V2.0[速度有所提升,但是依旧很慢]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import os
import numpy as np
import pandas as pd
from collections import Counter

# get data from file
def get_data(filepath):
data = {}
files = os.listdir(os.getcwd()+filepath)
for file in files:
classx = file.split("_")[0]
with open(os.getcwd()+filepath+'/'+file, 'r') as f:
key = f.read().replace('\n', '')
data[key] = classx

return data

# get training data
def get_training_data():
training_data = get_data("/trainingDigits")
return training_data


# get test data
def get_test_data():
test_data = get_data("/testDigits")
return test_data


# S2--> 算法实现

# way1 ---> 速度极其慢
'''
def calc_dist(px, py):
px = np.array([float(i) for i in np.array(list(px))])
py = np.array([float(i) for i in np.array(list(py))])

return np.linalg.norm(px-py)

# 按照与待测点距离的大小排序
def get_rank(training_data, p0):
point_dist = {}
for px in training_data.keys():
point_dist[px] = calc_dist(px, p0)
point_dist = pd.Series(point_dist, index=point_dist.keys())
point_dist.sort_values(inplace=True)

return point_dist
'''

# way2--> 速度有所提高,但也是很慢
'''

def calc_dist(training_data, p0):
def str2array(p):
return np.array([float(i) for i in list(p)])

numSamples = len(training_data)
training_data_pro = [str2array(p) for p in list(training_data.keys())]
p0 = str2array(p0)
# step1:calculate Euclidean distance
# tile(A, reps):Constract an array by repeating A reps times
diff = np.matrix(np.tile(p0, (numSamples, 1))) - np.matrix(training_data_pro)
squreDiff = np.square(diff)
squreDist = np.sum(squreDiff, axis=1) # sum if performed by row
distance = np.sqrt(squreDist)

distance = [distance[i, 0] for i in range(len(distance))]
point_dist = pd.Series(distance, index=training_data.keys())
point_dist.sort_values(inplace=True)

return point_dist

# 按照与待测点距离的大小排序
def get_rank(training_data, p0):
point_dist = calc_dist(training_data, p0)
return point_dist
'''




# 获取最近的k个数据点
def get_knn(point_dist, k=1):
knn = point_dist[:k].index

return knn

# 找到待测点所属的类别
def get_class(knn, training_data):
class_all = [training_data[point] for point in knn]
# print(class_all)
c = Counter(class_all)
# 多数表决
classx = c.most_common(1)[0][0]

return classx


# S3--> 运行测试

def classify(p0, training_data):
point_dist = get_rank(training_data, p0)
knn = get_knn(point_dist, 3) class_x = get_class(knn, training_data)
print(p0, '所属的类别为:', class_x)

return class_x

def run():
training_data = get_training_data()
test_data = get_test_data()
right = 0
all = 0
for point in test_data.keys():
all += 1
print(all)
classx = classify(point, training_data)
if test_data[point] == classx:
right += 1
print("all-->", all)
print("right-->", right)
print('accuracy-->', right/all)

if __name__=='__main__':
run()

V3.0速度大幅提高
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119

import os
from numpy import *


# classify using KNN
def KNNClassify(newInput, dataSet, labels, k):
numSamples = dataSet.shape[0] # row number
# step1:calculate Euclidean distance
# tile(A, reps):Constract an array by repeating A reps times
diff = tile(newInput, (numSamples, 1)) - dataSet
squreDiff = diff ** 2
squreDist = sum(squreDiff, axis=1) # sum if performed by row
distance = squreDist ** 0.5

# step2:sort the distance
# argsort() returns the indices that would sort an array in a ascending order
sortedDistIndices = argsort(distance)

classCount = {}
for i in range(k):
# choose the min k distance
voteLabel = labels[sortedDistIndices[i]]

# step4:count the times labels occur
# when the key voteLabel is not in dictionary classCount,
# get() will return 0
classCount[voteLabel] = classCount.get(voteLabel, 0) + 1
# step5:the max vote class will return
maxCount = 0
for k, v in classCount.items():
if v > maxCount:
maxCount = v
maxIndex = k

return maxIndex


# convert image to vector
def img2vector(filename):
rows = 32
cols = 32
imgVector = zeros((1, rows * cols))
fileIn = open(filename)
for row in range(rows):
lineStr = fileIn.readline()
for col in range(cols):
imgVector[0, row * 32 + col] = int(lineStr[col])

return imgVector


# load dataSet
def loadDataSet():
## step 1: Getting training set
print("---Getting training set...")
dataSetDir = '/home/shen/PycharmProjects/MyPython/统计学习方法/KNN/digits/'
trainingFileList = os.listdir(dataSetDir + 'trainingDigits') # load the training set
numSamples = len(trainingFileList)

train_x = zeros((numSamples, 1024))
train_y = []
for i in range(numSamples):
filename = trainingFileList[i]

# get train_x
train_x[i, :] = img2vector(dataSetDir + 'trainingDigits/%s' % filename)

# get label from file name such as "1_18.txt"
label = int(filename.split('_')[0]) # return 1
train_y.append(label)

## step 2: Getting testing set
print("---Getting testing set...")
testingFileList = os.listdir(dataSetDir + 'testDigits') # load the testing set
numSamples = len(testingFileList)
test_x = zeros((numSamples, 1024))
test_y = []
for i in range(numSamples):
filename = testingFileList[i]

# get train_x
test_x[i, :] = img2vector(dataSetDir + 'testDigits/%s' % filename)

# get label from file name such as "1_18.txt"
label = int(filename.split('_')[0]) # return 1
test_y.append(label)

return train_x, train_y, test_x, test_y


# test hand writing class
def testHandWritingClass():
## step 1: load data
print("step 1: load data...")
train_x, train_y, test_x, test_y = loadDataSet()

## step 2: training...
print("step 2: training...")
pass

## step 3: testing
print("step 3: testing...")
numTestSamples = test_x.shape[0]
matchCount = 0
for i in range(numTestSamples):
predict = KNNClassify(test_x[i], train_x, train_y, 3)
if predict == test_y[i]:
matchCount += 1
accuracy = float(matchCount) / numTestSamples

## step 4: show the result
print("step 4: show the result...")
print('The classify accuracy is: %.2f%%' % (accuracy * 100))


testHandWritingClass()


输出:

要改进学习的地方

最后参考别人的代码快,感觉主要问题在numpy的矩阵运算上,这个之后要深入研究下,毕竟代码运行速度是极为重要的。

击蒙御寇