手撕MLP和CNN

介绍利用pytorch、numpy分别实现MLP和CNN网络架构,并且进行手写数字识别任务,详细分析原理。

MLP 和 CNN 原理介绍

在深入探讨具体的代码实现之前,我们非常有必要先从底层的数学公式和矩阵求导法则入手,梳理多层感知机(MLP)与卷积神经网络(CNN)的前向与反向传播原理。理解了这些核心的张量微积分运算,再去看纯 NumPy 框架下的数组操作,就会变得顺理成章。以下是相关网络架构和优化器的核心数学原理。

多层感知机 (MLP) 的数学原理

多层感知机通过线性变换与非线性激活函数的堆叠,实现对高维数据的特征映射。

线性层 (Linear / Fully Connected)

前向传播:
对于输入张量 XRN×DX \in \mathbb{R}^{N \times D}(其中 NN 为 Batch Size,DD 为输入维度),权重矩阵 WRD×MW \in \mathbb{R}^{D \times M},偏置向量 bRMb \in \mathbb{R}^{M},前向计算公式为:

Z=XW+bZ = XW + b

反向传播:
在反向传播中,假设从后一层传来的损失函数对当前层输出 ZZ 的梯度为 LZRN×M\frac{\partial L}{\partial Z} \in \mathbb{R}^{N \times M}。根据矩阵求导链式法则,我们需要计算对 WWbb 的梯度(用于参数更新)以及对 XX 的梯度(用于继续向后传):

dW=LW=XTLZdW = \frac{\partial L}{\partial W} = X^T \frac{\partial L}{\partial Z}

db=Lb=i=1N(LZ)i(沿 Batch 维度求和)db = \frac{\partial L}{\partial b} = \sum_{i=1}^{N} \left( \frac{\partial L}{\partial Z} \right)_i \quad \text{(沿 Batch 维度求和)}

dX=LX=LZWTdX = \frac{\partial L}{\partial X} = \frac{\partial L}{\partial Z} W^T

激活函数 (ReLU)

前向传播:

A=max(0,Z)A = \max(0, Z)

反向传播:
ReLU 的导数是一个分段函数(掩码)。当输入 Z>0Z > 0 时,梯度原样传递;否则梯度截断为 0:

dZ=dAI(Z>0)dZ = dA \odot \mathbb{I}(Z > 0)

(其中 \odot 表示逐元素乘法,I\mathbb{I} 为指示函数)

交叉熵损失函数与 Softmax

在多分类任务中,通常将 Softmax 与负对数似然损失(Negative Log-Likelihood)结合使用,以避免数值溢出。

前向传播:
给定模型输出的 Logits ZRN×CZ \in \mathbb{R}^{N \times C}CC 为类别数),首先通过 Softmax 计算概率 PP

Pi,j=exp(Zi,jmaxkZi,k)k=1Cexp(Zi,kmaxkZi,k)P_{i,j} = \frac{\exp(Z_{i,j} - \max_k Z_{i,k})}{\sum_{k=1}^C \exp(Z_{i,k} - \max_k Z_{i,k})}

随后计算交叉熵损失 LL(假设目标标签 YY 为 One-hot 编码):

L=1Ni=1Nj=1CYi,jlog(Pi,j)L = -\frac{1}{N} \sum_{i=1}^N \sum_{j=1}^C Y_{i,j} \log(P_{i,j})

反向传播:
Softmax 结合交叉熵的对 ZZ 的求导结果异常简洁,这也是为什么它们常被绑定在一起实现的原因:

dZ=LZ=1N(PY)dZ = \frac{\partial L}{\partial Z} = \frac{1}{N} (P - Y)


卷积神经网络 (CNN) 的数学原理

CNN 通过局部感受野和权重共享机制,极大地减少了参数量,非常适合处理二维图像这类具有空间平移不变性的数据。

二维卷积层 (Conv2d)

前向传播:
假设输入张量为 XX(大小为 N×Cin×H×WN \times C_{in} \times H \times W),卷积核为 KK(大小为 Cout×Cin×kh×kwC_{out} \times C_{in} \times k_h \times k_w),偏置为 bb。输出特征图的长宽计算公式为:

Hout=H+2×paddingkhstride+1H_{out} = \left\lfloor \frac{H + 2 \times \text{padding} - k_h}{\text{stride}} \right\rfloor + 1

在前向滑窗过程中,输出特征图的第 cc 个通道上的某个点 (i,j)(i, j) 的值为该窗口内输入数据与对应卷积核的 Frobenius 内积加上偏置:

Yn,c,i,j=c=1Cinu=0kh1v=0kw1Xn,c,i×s+u,j×s+vKc,c,u,v+bcY_{n, c, i, j} = \sum_{c'=1}^{C_{in}} \sum_{u=0}^{k_h-1} \sum_{v=0}^{k_w-1} X_{n, c', i \times s + u, j \times s + v} \cdot K_{c, c', u, v} + b_c

反向传播:
在 NumPy 的底层实现中,这通常通过累加局部窗口的梯度来完成(对应代码中的嵌套循环):

  • 计算对偏置的梯度 dbdb:直接对传入的梯度 L/Y\partial L / \partial Y 在 Batch 以及空间维度 (Hout,WoutH_{out}, W_{out}) 上求和。
  • 计算对权重的梯度 dKdK:输入特征图的对应感受野切片与输出梯度相乘并累加。
  • 计算对输入的梯度 dXdX:将输出梯度乘以卷积核权重,并累加回输入特征图对应的感受野位置。

最大池化层 (MaxPool2d)

前向传播:
在定义的滑窗内取最大值,实现下采样:

Yn,c,i,j=maxu,vXn,c,i×s+u,j×s+vY_{n, c, i, j} = \max_{u, v} X_{n, c, i \times s + u, j \times s + v}

反向传播:
池化层没有可学习参数,其反向传播的作用是“路由”梯度。由于前向传播时只有窗口内的最大值参与了计算,因此在反向传播时,接收到的梯度只传递给该最大值所在的位置,窗口内其他位置的梯度为 0(掩码机制)。

优化器:Adam (Adaptive Moment Estimation)

Adam 结合了动量法(Momentum)和 RMSProp 的思想,为每个参数自适应地计算独立的学习率。

在时间步 tt,针对某一参数矩阵 θ\theta(其梯度为 gtg_t):

  1. 更新一阶矩估计(均值): mt=β1mt1+(1β1)gtm_t = \beta_1 m_{t-1} + (1 - \beta_1) g_t
  2. 更新二阶矩估计(未中心化的方差): vt=β2vt1+(1β2)gt2v_t = \beta_2 v_{t-1} + (1 - \beta_2) g_t^2
  3. 偏差修正(针对初始阶段的零偏置):

    m^t=mt1β1t\hat{m}_t = \frac{m_t}{1 - \beta_1^t}

    v^t=vt1β2t\hat{v}_t = \frac{v_t}{1 - \beta_2^t}

  4. 参数更新:

    θt=θt1αm^tv^t+ϵ\theta_{t} = \theta_{t-1} - \alpha \frac{\hat{m}_t}{\sqrt{\hat{v}_t} + \epsilon}

    (其中 α\alpha 为学习率,β1\beta_1β2\beta_2 为衰减率常数,ϵ\epsilon 为防止除零的极小值)。

理解了上述前向计算维度与反向误差传递的求导链条后,我们即可将这些数学逻辑映射到 NumPy 的矩阵运算和 PyTorch 的动态图机制中。

纯 NumPy 实现篇

这部分代码从零构建了张量运算、前向传播、反向传播(链式求导)以及优化器,非常适合用来理解深度学习的底层数学原理。

NumPy 实现多层感知机 (MLP)

包含完整的全连接层、ReLU 激活函数和交叉熵损失函数的底层实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
import numpy as np
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split

# ==========================================
# 1. 设置超参数与设备
# ==========================================
batch_size = 64 # 每次处理的数据量
learning_rate = 0.001 # 学习率
epochs = 5 # 训练轮数

# Numpy 默认使用 CPU 进行计算。如果你配置了 CuPy,可以将 np 替换为 cp 以实现 GPU 加速。
print("当前使用的计算库:纯 NumPy (CPU)")

# ==========================================
# 2. 数据准备与加载 (MNIST 数据集)
# ==========================================
print("正在下载/加载 MNIST 数据集,这可能需要一点时间...")
# 使用 sklearn 获取 MNIST 数据 (70000张图片,已被展平为 784 维向量)
X, y = fetch_openml('mnist_784', version=1, return_X_y=True, as_frame=False, parser='auto')

# 模拟 transforms.ToTensor():将像素值缩放到 [0, 1] 区间
X = X / 255.0
# 模拟 transforms.Normalize:MNIST 全局均值和标准差
X = (X - 0.1307) / 0.3081
# 转换标签类型
y = y.astype(int)

# 划分训练集 (60000) 和测试集 (10000),模拟 PyTorch 默认切分
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=10000, random_state=42)

# 定义一个 DataLoader 生成器来模拟 torch.utils.data.DataLoader
def data_loader(X, y, batch_size, shuffle=True):
n_samples = X.shape[0]
indices = np.arange(n_samples)
if shuffle:
np.random.shuffle(indices)
for start_idx in range(0, n_samples, batch_size):
end_idx = min(start_idx + batch_size, n_samples)
batch_idx = indices[start_idx:end_idx]
yield X[batch_idx], y[batch_idx]

# ==========================================
# 3. 定义网络层与多层感知机 (MLP)
# ==========================================
class Flatten:
"""将任意维度的输入展平为二维 (Batch_size, -1)"""
def forward(self, x):
self.original_shape = x.shape
return x.reshape(x.shape[0], -1)

def backward(self, dout):
return dout.reshape(self.original_shape)

class Linear:
"""全连接层 (y = xW + b)"""
def __init__(self, in_features, out_features):
# 使用 He 初始化 (适用于 ReLU 激活函数)
self.W = np.random.randn(in_features, out_features) * np.sqrt(2.0 / in_features)
self.b = np.zeros(out_features)

# 梯度容器
self.dW = np.zeros_like(self.W)
self.db = np.zeros_like(self.b)
self.x = None

def forward(self, x):
self.x = x
return np.dot(x, self.W) + self.b

def backward(self, dout):
# 计算梯度:dW, db 以及传给下一层的 dx
self.dW += np.dot(self.x.T, dout)
self.db += np.sum(dout, axis=0)
dx = np.dot(dout, self.W.T)
return dx

class ReLU:
"""ReLU 激活函数"""
def __init__(self):
self.x = None

def forward(self, x):
self.x = x
return np.maximum(0, x)

def backward(self, dout):
dx = dout.copy()
dx[self.x <= 0] = 0
return dx

class MLP:
"""多层感知机模型"""
def __init__(self):
self.flatten = Flatten()
# 顺序组合所有的层
self.network = [
Linear(28 * 28, 512),
ReLU(),
Linear(512, 256),
ReLU(),
Linear(256, 10)
]

def forward(self, x):
x = self.flatten.forward(x)
for layer in self.network:
x = layer.forward(x)
return x

def backward(self, dout):
# 反向传播顺序要倒过来
for layer in reversed(self.network):
dout = layer.backward(dout)
dout = self.flatten.backward(dout)
return dout

def get_params_and_grads(self):
# 提取模型中所有需要更新权重的层 (Linear)
layers_with_params = [layer for layer in self.network if isinstance(layer, Linear)]
return layers_with_params

model = MLP()

# ==========================================
# 4. 定义损失函数与优化器
# ==========================================
class CrossEntropyLoss:
"""交叉熵损失 (结合了 Softmax)"""
def __init__(self):
self.probs = None
self.targets = None

def forward(self, logits, targets):
self.targets = targets
N = logits.shape[0]

# 减去最大值防止指数爆炸 (数值稳定性)
shifted_logits = logits - np.max(logits, axis=1, keepdims=True)
exp_scores = np.exp(shifted_logits)
self.probs = exp_scores / np.sum(exp_scores, axis=1, keepdims=True)

# 计算负对数似然损失
correct_logprobs = -np.log(self.probs[range(N), targets] + 1e-8)
loss = np.sum(correct_logprobs) / N
return loss

def backward(self):
# 反向传播求导:softmax 输出减去 One-hot 标签
N = self.probs.shape[0]
dx = self.probs.copy()
dx[range(N), self.targets] -= 1
dx /= N
return dx

class AdamOptimizer:
"""纯 NumPy 实现的 Adam 优化器"""
def __init__(self, layers, lr=0.001, beta1=0.9, beta2=0.999, eps=1e-8):
self.layers = layers
self.lr = lr
self.beta1, self.beta2, self.eps = beta1, beta2, eps
self.t = 0

# 为每层的 W 和 b 预先分配动量矩阵 (m: 一阶矩, v: 二阶矩)
self.m = [{'W': np.zeros_like(l.W), 'b': np.zeros_like(l.b)} for l in self.layers]
self.v = [{'W': np.zeros_like(l.W), 'b': np.zeros_like(l.b)} for l in self.layers]

def zero_grad(self):
"""清空上一轮的梯度"""
for l in self.layers:
l.dW.fill(0)
l.db.fill(0)

def step(self):
"""更新权重"""
self.t += 1
for i, l in enumerate(self.layers):
# 更新 W
self.m[i]['W'] = self.beta1 * self.m[i]['W'] + (1 - self.beta1) * l.dW
self.v[i]['W'] = self.beta2 * self.v[i]['W'] + (1 - self.beta2) * (l.dW ** 2)
m_hat_W = self.m[i]['W'] / (1 - self.beta1 ** self.t) # 偏差修正
v_hat_W = self.v[i]['W'] / (1 - self.beta2 ** self.t) # 偏差修正
l.W -= self.lr * m_hat_W / (np.sqrt(v_hat_W) + self.eps)

# 更新 b
self.m[i]['b'] = self.beta1 * self.m[i]['b'] + (1 - self.beta1) * l.db
self.v[i]['b'] = self.beta2 * self.v[i]['b'] + (1 - self.beta2) * (l.db ** 2)
m_hat_b = self.m[i]['b'] / (1 - self.beta1 ** self.t)
v_hat_b = self.v[i]['b'] / (1 - self.beta2 ** self.t)
l.b -= self.lr * m_hat_b / (np.sqrt(v_hat_b) + self.eps)


criterion = CrossEntropyLoss()
optimizer = AdamOptimizer(model.get_params_and_grads(), lr=learning_rate)

# ==========================================
# 5. 模型训练
# ==========================================
print("开始训练模型...")
num_batches = len(X_train) // batch_size + (1 if len(X_train) % batch_size != 0 else 0)

for epoch in range(epochs):
running_loss = 0.0

# 获取 DataLoader (生成器)
train_loader = data_loader(X_train, y_train, batch_size, shuffle=True)

for batch_idx, (data, targets) in enumerate(train_loader):
# 反向传播前需先清零梯度
optimizer.zero_grad()

# 前向传播
scores = model.forward(data)

# 计算损失
loss = criterion.forward(scores, targets)

# 反向传播 (从损失函数开始求导并传递给模型)
dout = criterion.backward()
model.backward(dout)

# 更新权重
optimizer.step()

running_loss += loss

print(f"Epoch [{epoch+1}/{epochs}], 平均 Loss: {running_loss/num_batches:.4f}")

# ==========================================
# 6. 模型评估 (测试集)
# ==========================================
print("开始评估模型...")
correct = 0
total = 0

# 相当于 with torch.no_grad(): 不需要记录和计算反向传播
test_loader = data_loader(X_test, y_test, batch_size, shuffle=False)

for data, targets in test_loader:
# 前向传播
scores = model.forward(data)

# 取出每个样本概率最大 (分数最高) 的类别索引
predictions = np.argmax(scores, axis=1)

# 统计预测准确数
correct += np.sum(predictions == targets)
total += len(targets)

accuracy = 100 * correct / total
print(f"测试集准确率: {accuracy:.2f}%")

NumPy 实现卷积神经网络 (CNN)

展示了如何通过嵌套循环手动实现 Conv2dMaxPool2d 的滑窗机制及其梯度反传。(注:由于纯 Python 循环执行卷积,运行速度会非常慢。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
import numpy as np
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split

# ==========================================
# 1. 设置超参数与设备
# ==========================================
batch_size = 64 # 每次处理的数据量
learning_rate = 0.001 # 学习率
epochs = 5 # 训练轮数

print("当前使用的计算库: 纯 NumPy (CPU)")

# ==========================================
# 2. 数据准备与加载 (MNIST 数据集)
# ==========================================
print("正在下载/加载 MNIST 数据集,这可能需要一点时间...")
# 获取 MNIST 数据 (已被展平为 784 维向量)
X, y = fetch_openml('mnist_784', version=1, return_X_y=True, as_frame=False, parser='auto')

# 预处理:将像素值缩放到 [0, 1] 区间
X = X / 255.0
# 预处理:标准化 (均值 0.1307, 标准差 0.3081)
X = (X - 0.1307) / 0.3081
# 转换标签类型
y = y.astype(int)

# ⚠️ 关键步骤:CNN 需要 2D 图像输入,将 (N, 784) 转换为 (N, 通道数, 高, 宽) -> (N, 1, 28, 28)
X = X.reshape(-1, 1, 28, 28)

# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=10000, random_state=42)

def data_loader(X, y, batch_size, shuffle=True):
"""模拟 DataLoader 生成器"""
n_samples = X.shape[0]
indices = np.arange(n_samples)
if shuffle:
np.random.shuffle(indices)
for start_idx in range(0, n_samples, batch_size):
end_idx = min(start_idx + batch_size, n_samples)
batch_idx = indices[start_idx:end_idx]
yield X[batch_idx], y[batch_idx]

# ==========================================
# 3. 定义网络层与 CNN 模型
# ==========================================
class Conv2d:
"""二维卷积层 (简单滑窗实现)"""
def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, padding=1):
self.in_channels = in_channels
self.out_channels = out_channels
self.kernel_size = kernel_size
self.stride = stride
self.padding = padding

# He 初始化
fan_in = in_channels * kernel_size * kernel_size
self.W = np.random.randn(out_channels, in_channels, kernel_size, kernel_size) * np.sqrt(2.0 / fan_in)
self.b = np.zeros(out_channels)

self.dW = np.zeros_like(self.W)
self.db = np.zeros_like(self.b)

def forward(self, x):
N, C, H, W = x.shape
out_h = (H + 2 * self.padding - self.kernel_size) // self.stride + 1
out_w = (W + 2 * self.padding - self.kernel_size) // self.stride + 1

# 边缘填充
self.x_padded = np.pad(x, ((0,0), (0,0), (self.padding, self.padding), (self.padding, self.padding)), mode='constant')
out = np.zeros((N, self.out_channels, out_h, out_w))

# 滑动窗口计算卷积
for i in range(out_h):
for j in range(out_w):
h_start, w_start = i * self.stride, j * self.stride
h_end, w_end = h_start + self.kernel_size, w_start + self.kernel_size

# 取出窗口数据 (N, C, K, K)
x_slice = self.x_padded[:, :, h_start:h_end, w_start:w_end]

# 计算每个输出通道的值
for k in range(self.out_channels):
out[:, k, i, j] = np.sum(x_slice * self.W[k, ...], axis=(1, 2, 3)) + self.b[k]
return out

def backward(self, dout):
N, _, out_h, out_w = dout.shape
dx_padded = np.zeros_like(self.x_padded)
self.dW.fill(0)
self.db.fill(0)

for i in range(out_h):
for j in range(out_w):
h_start, w_start = i * self.stride, j * self.stride
h_end, w_end = h_start + self.kernel_size, w_start + self.kernel_size

for k in range(self.out_channels):
self.db[k] += np.sum(dout[:, k, i, j])

# 累加梯度到 dW 和 dx
dout_val = (dout[:, k, i, j])[:, None, None, None] # 调整维度以便广播
self.dW[k] += np.sum(self.x_padded[:, :, h_start:h_end, w_start:w_end] * dout_val, axis=0)
dx_padded[:, :, h_start:h_end, w_start:w_end] += self.W[k, ...] * dout_val

# 去除 padding 部分得到最终对 x 的梯度
if self.padding > 0:
dx = dx_padded[:, :, self.padding:-self.padding, self.padding:-self.padding]
else:
dx = dx_padded
return dx

class MaxPool2d:
"""二维最大池化层"""
def __init__(self, kernel_size=2, stride=2):
self.kernel_size = kernel_size
self.stride = stride

def forward(self, x):
self.x = x
N, C, H, W = x.shape
out_h = H // self.kernel_size
out_w = W // self.kernel_size
out = np.zeros((N, C, out_h, out_w))

for i in range(out_h):
for j in range(out_w):
h_start, w_start = i * self.stride, j * self.stride
h_end, w_end = h_start + self.kernel_size, w_start + self.kernel_size

x_slice = x[:, :, h_start:h_end, w_start:w_end]
out[:, :, i, j] = np.max(x_slice, axis=(2, 3))
return out

def backward(self, dout):
N, C, out_h, out_w = dout.shape
dx = np.zeros_like(self.x)

for i in range(out_h):
for j in range(out_w):
h_start, w_start = i * self.stride, j * self.stride
h_end, w_end = h_start + self.kernel_size, w_start + self.kernel_size

x_slice = self.x[:, :, h_start:h_end, w_start:w_end]
# 找到窗口内的最大值,生成掩码 (Mask)
max_val = np.max(x_slice, axis=(2, 3), keepdims=True)
mask = (x_slice == max_val)
# 只有最大值的位置接收梯度
dx[:, :, h_start:h_end, w_start:w_end] += mask * (dout[:, :, i, j])[:, :, None, None]
return dx

class Linear:
"""全连接层"""
def __init__(self, in_features, out_features):
self.W = np.random.randn(in_features, out_features) * np.sqrt(2.0 / in_features)
self.b = np.zeros(out_features)
self.dW = np.zeros_like(self.W)
self.db = np.zeros_like(self.b)

def forward(self, x):
self.x = x
return np.dot(x, self.W) + self.b

def backward(self, dout):
self.dW = np.dot(self.x.T, dout)
self.db = np.sum(dout, axis=0)
return np.dot(dout, self.W.T)

class ReLU:
def forward(self, x):
self.x = x
return np.maximum(0, x)

def backward(self, dout):
dx = dout.copy()
dx[self.x <= 0] = 0
return dx

class Flatten:
def forward(self, x):
self.original_shape = x.shape
return x.reshape(x.shape[0], -1)

def backward(self, dout):
return dout.reshape(self.original_shape)

class CNN:
"""组合 CNN 模型"""
def __init__(self):
# 特征提取层
self.features = [
Conv2d(1, 16, kernel_size=3, stride=1, padding=1),
ReLU(),
MaxPool2d(kernel_size=2, stride=2),
Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
ReLU(),
MaxPool2d(kernel_size=2, stride=2)
]
# 分类器层
self.classifier = [
Flatten(),
Linear(32 * 7 * 7, 128),
ReLU(),
Linear(128, 10)
]
self.network = self.features + self.classifier

def forward(self, x):
for layer in self.network:
x = layer.forward(x)
return x

def backward(self, dout):
for layer in reversed(self.network):
dout = layer.backward(dout)
return dout

def get_params_and_grads(self):
# 提取需要更新权重的层 (Conv2d 和 Linear)
return [layer for layer in self.network if hasattr(layer, 'W')]

model = CNN()

# ==========================================
# 4. 定义损失函数与优化器
# ==========================================
class CrossEntropyLoss:
def forward(self, logits, targets):
self.targets = targets
N = logits.shape[0]
shifted_logits = logits - np.max(logits, axis=1, keepdims=True)
exp_scores = np.exp(shifted_logits)
self.probs = exp_scores / np.sum(exp_scores, axis=1, keepdims=True)
correct_logprobs = -np.log(self.probs[range(N), targets] + 1e-8)
return np.sum(correct_logprobs) / N

def backward(self):
N = self.probs.shape[0]
dx = self.probs.copy()
dx[range(N), self.targets] -= 1
dx /= N
return dx

class AdamOptimizer:
def __init__(self, layers, lr=0.001, beta1=0.9, beta2=0.999, eps=1e-8):
self.layers = layers
self.lr, self.beta1, self.beta2, self.eps = lr, beta1, beta2, eps
self.t = 0
self.m = [{'W': np.zeros_like(l.W), 'b': np.zeros_like(l.b)} for l in layers]
self.v = [{'W': np.zeros_like(l.W), 'b': np.zeros_like(l.b)} for l in layers]

def zero_grad(self):
for l in self.layers:
l.dW.fill(0)
l.db.fill(0)

def step(self):
self.t += 1
for i, l in enumerate(self.layers):
self.m[i]['W'] = self.beta1 * self.m[i]['W'] + (1 - self.beta1) * l.dW
self.v[i]['W'] = self.beta2 * self.v[i]['W'] + (1 - self.beta2) * (l.dW ** 2)
l.W -= self.lr * (self.m[i]['W'] / (1 - self.beta1 ** self.t)) / (np.sqrt(self.v[i]['W'] / (1 - self.beta2 ** self.t)) + self.eps)

self.m[i]['b'] = self.beta1 * self.m[i]['b'] + (1 - self.beta1) * l.db
self.v[i]['b'] = self.beta2 * self.v[i]['b'] + (1 - self.beta2) * (l.db ** 2)
l.b -= self.lr * (self.m[i]['b'] / (1 - self.beta1 ** self.t)) / (np.sqrt(self.v[i]['b'] / (1 - self.beta2 ** self.t)) + self.eps)

criterion = CrossEntropyLoss()
optimizer = AdamOptimizer(model.get_params_and_grads(), lr=learning_rate)

# ==========================================
# 5. 模型训练
# ==========================================
print("开始训练模型 (注意: 纯 NumPy 卷积计算很慢!)...")
num_batches = len(X_train) // batch_size + (1 if len(X_train) % batch_size != 0 else 0)

for epoch in range(epochs):
running_loss = 0.0
train_loader = data_loader(X_train, y_train, batch_size, shuffle=True)

for batch_idx, (data, targets) in enumerate(train_loader):
optimizer.zero_grad()

# 前向与反向传播
scores = model.forward(data)
loss = criterion.forward(scores, targets)
dout = criterion.backward()
model.backward(dout)

optimizer.step()
running_loss += loss

# 每 10 个 batch 打印一次,以免以为死机
if (batch_idx + 1) % 10 == 0:
print(f" Epoch {epoch+1}, Batch [{batch_idx+1}/{num_batches}] Loss: {loss:.4f}")

print(f"Epoch [{epoch+1}/{epochs}] 完成, 平均 Loss: {running_loss/num_batches:.4f}")

# ==========================================
# 6. 模型评估 (测试集)
# ==========================================
print("开始评估模型...")
correct, total = 0, 0
test_loader = data_loader(X_test, y_test, batch_size, shuffle=False)

for data, targets in test_loader:
scores = model.forward(data)
predictions = np.argmax(scores, axis=1)
correct += np.sum(predictions == targets)
total += len(targets)

accuracy = 100 * correct / total
print(f"测试集准确率: {accuracy:.2f}%")

PyTorch 实现篇

借助 PyTorch 的 nn.Module 和 Autograd 自动求导机制,代码大幅简化,同时支持调用底层 C++/CUDA 获得极高的运算效率。

PyTorch 实现多层感知机 (MLP)

利用 nn.Sequential 优雅地构建了模型,并使用 torchvision.datasets 高效地下载与加载 MNIST 数据集。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

# ==========================================
# 1. 设置超参数与设备
# ==========================================
batch_size = 64 # 每次处理的数据量
learning_rate = 0.001 # 学习率
epochs = 5 # 训练轮数

# 检查是否有 GPU 可用,如果有则使用 GPU 加速
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"当前使用的设备: {device}")

# ==========================================
# 2. 数据准备与加载 (MNIST 数据集)
# ==========================================
# 定义数据预处理:将图片转换为张量,并进行标准化处理
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,)) # MNIST 的全局均值和标准差
])

# 下载并加载训练集和测试集
train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
test_dataset = datasets.MNIST(root='./data', train=False, transform=transform)

# 使用 DataLoader 包装数据集,方便批量获取和打乱数据
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

# ==========================================
# 3. 定义多层感知机 (MLP) 模型
# ==========================================
class MLP(nn.Module):
def __init__(self):
super(MLP, self).__init__()
# MNIST 图片大小是 28x28 像素的灰度图
# Flatten 层用于将 2D 图像展平为 1D 向量 (长度为 784)
self.flatten = nn.Flatten()

# 定义神经网络结构:输入层 -> 隐藏层1 -> 隐藏层2 -> 输出层
self.network = nn.Sequential(
nn.Linear(28 * 28, 512), # 第一层:784 -> 512
nn.ReLU(), # 激活函数
nn.Linear(512, 256), # 第二层:512 -> 256
nn.ReLU(), # 激活函数
nn.Linear(256, 10) # 输出层:256 -> 10 (对应 0-9 共 10 个数字的类别)
)

def forward(self, x):
x = self.flatten(x)
logits = self.network(x)
return logits

# 实例化模型并移动到指定设备 (CPU 或 GPU)
model = MLP().to(device)

# ==========================================
# 4. 定义损失函数与优化器
# ==========================================
# 分类问题通常使用交叉熵损失函数
criterion = nn.CrossEntropyLoss()
# 使用 Adam 优化器来更新网络权重
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# ==========================================
# 5. 模型训练
# ==========================================
print("开始训练模型...")
for epoch in range(epochs):
model.train() # 将模型设置为训练模式
running_loss = 0.0

for batch_idx, (data, targets) in enumerate(train_loader):
# 将数据和标签移动到指定设备
data = data.to(device)
targets = targets.to(device)

# 前向传播:计算模型输出
scores = model(data)
# 计算损失
loss = criterion(scores, targets)

# 反向传播:计算梯度之前先清零
optimizer.zero_grad()
loss.backward()

# 更新权重
optimizer.step()

running_loss += loss.item()

# 打印每个 epoch 的平均损失
print(f"Epoch [{epoch+1}/{epochs}], 平均 Loss: {running_loss/len(train_loader):.4f}")

# ==========================================
# 6. 模型评估 (测试集)
# ==========================================
print("开始评估模型...")
model.eval() # 将模型设置为评估模式 (会关闭 Dropout 等)
correct = 0
total = 0

# 评估时不需要计算梯度,可以节省内存并加速
with torch.no_grad():
for data, targets in test_loader:
data = data.to(device)
targets = targets.to(device)

# 获取模型预测结果
scores = model(data)
# 找出每个样本概率最大的类别索引
_, predictions = scores.max(1)

# 统计正确的数量
correct += (predictions == targets).sum().item()
total += targets.size(0)

accuracy = 100 * correct / total
print(f"测试集准确率: {accuracy:.2f}%")

PyTorch 实现卷积神经网络 (CNN)

在现代框架中实现标准 CNN 架构的标准姿势,包含特征提取层 (features) 与分类器 (classifier) 的清晰划分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

# ==========================================
# 1. 设置超参数与设备
# ==========================================
batch_size = 64 # 每次处理的数据量
learning_rate = 0.001 # 学习率
epochs = 5 # 训练轮数

# 检查是否有 GPU 可用,如果有则使用 GPU 加速
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"当前使用的设备: {device}")

# ==========================================
# 2. 数据准备与加载 (MNIST 数据集)
# ==========================================
# 定义数据预处理:将图片转换为张量,并进行标准化处理
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,)) # MNIST 的全局均值和标准差
])

# 下载并加载训练集和测试集
train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
test_dataset = datasets.MNIST(root='./data', train=False, transform=transform)

# 使用 DataLoader 包装数据集,方便批量获取和打乱数据
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

# ==========================================
# 3. 定义卷积神经网络 (CNN) 模型
# ==========================================
class CNN(nn.Module):
def __init__(self):
super(CNN, self).__init__()

# 特征提取层 (卷积层 + 池化层)
self.features = nn.Sequential(
# 第一层卷积:输入通道 1 (灰度图),输出通道 16,卷积核 3x3,边缘填充 1
# 输入尺寸: [batch_size, 1, 28, 28] -> 输出: [batch_size, 16, 28, 28]
nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, stride=1, padding=1),
nn.ReLU(),
# 最大池化层:窗口大小 2x2,步长 2
# 输出尺寸: [batch_size, 16, 14, 14]
nn.MaxPool2d(kernel_size=2, stride=2),

# 第二层卷积:输入通道 16,输出通道 32,卷积核 3x3,边缘填充 1
# 输出尺寸: [batch_size, 32, 14, 14]
nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1),
nn.ReLU(),
# 最大池化层:窗口大小 2x2,步长 2
# 输出尺寸: [batch_size, 32, 7, 7]
nn.MaxPool2d(kernel_size=2, stride=2)
)

# 分类器 (全连接层)
self.classifier = nn.Sequential(
nn.Flatten(), # 展平多维张量:32 * 7 * 7 = 1568
nn.Linear(32 * 7 * 7, 128), # 全连接层 1:1568 -> 128
nn.ReLU(), # 激活函数
nn.Linear(128, 10) # 全连接层 2 (输出层):128 -> 10 类别
)

def forward(self, x):
# 先通过特征提取层 (卷积等)
x = self.features(x)
# 再通过分类器获得分类 logits
logits = self.classifier(x)
return logits

# 实例化模型并移动到指定设备 (CPU 或 GPU)
model = CNN().to(device)

# ==========================================
# 4. 定义损失函数与优化器
# ==========================================
# 分类问题通常使用交叉熵损失函数
criterion = nn.CrossEntropyLoss()
# 使用 Adam 优化器来更新网络权重
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# ==========================================
# 5. 模型训练
# ==========================================
print("开始训练模型...")
for epoch in range(epochs):
model.train() # 将模型设置为训练模式
running_loss = 0.0

for batch_idx, (data, targets) in enumerate(train_loader):
# 将数据和标签移动到指定设备
data = data.to(device)
targets = targets.to(device)

# 前向传播:计算模型输出
scores = model(data)
# 计算损失
loss = criterion(scores, targets)

# 反向传播:计算梯度之前先清零
optimizer.zero_grad()
loss.backward()

# 更新权重
optimizer.step()

running_loss += loss.item()

# 打印每个 epoch 的平均损失
print(f"Epoch [{epoch+1}/{epochs}], 平均 Loss: {running_loss/len(train_loader):.4f}")

# ==========================================
# 6. 模型评估 (测试集)
# ==========================================
print("开始评估模型...")
model.eval() # 将模型设置为评估模式 (会关闭 Dropout 等行为)
correct = 0
total = 0

# 评估时不需要计算梯度,可以节省内存并加速
with torch.no_grad():
for data, targets in test_loader:
data = data.to(device)
targets = targets.to(device)

# 获取模型预测结果
scores = model(data)
# 找出每个样本概率最大的类别索引
_, predictions = scores.max(1)

# 统计正确的数量
correct += (predictions == targets).sum().item()
total += targets.size(0)

accuracy = 100 * correct / total
print(f"测试集准确率: {accuracy:.2f}%")

手撕MLP和CNN
https://huan-yin.github.io/2026/04/15/手撕MLP和CNN/
作者
李相越
发布于
2026年4月15日
许可协议