一文读懂transform原理及代码实现

0 引言

由于之前的模型,例如Seq2Seq模型有如下缺点,所以导致了transformer的诞生~

  1. 上下文向量信息压缩

    输入序列的全部信息需要被编码成一个固定维度的上下文向量,用这个固定的向量来表示编码器的全部信息,这样会导致信息的损耗。

  2. 无法并行

    Seq2Seq模型在处理长序列时存在短期记忆限制,难以有效捕获和传递长期依赖性。本质上和RNN/LSTM无法并行的原因一样。

    对于 RNN 来说, 任意时刻 t 的输⼊是时刻 t 的输⼊ x(t) 和上⼀时刻的隐藏层输出 h(t-1) , 经过运算后得到当前时刻隐藏层的输出 h(t) , 这个 h(t) 也即将作为下⼀时刻 t+1 的输⼊的⼀部分. 这个计算过程是 RNN 的本质特征, RNN 的历史信息是需要通过这个时间步⼀步⼀步向后传递的. ⽽这就意味着 RNN 序列后⾯的信息只能等到前⾯的计算结束后, 将历史信息通过 hidden state 传递给后⾯才能开始计算, 形成链式的序列依赖关系, ⽆法实现并⾏。

Transformer架构同时解决了Seq2Seq的两大缺陷,既可以并行又利用multi-head attention机制来解决Encoder固定编码的问题,让Decoder在解码的每一步可以通过注意力机制去关注编码器中最重要的部分。

对于 Transformer 结构来说, 在 self-attention 层, ⽆论序列的⻓度是多少, 都可以⼀次性计算所有单词之间的注意⼒关系, 这个 attention 的计算是同步的, 可以实现并⾏.

注意: 不是越先进的模型就越⽆敌, 在很多具体的应⽤中 RNN/LSTM 依然⼤有⽤武之地, 要具体问题具体分析

0.1 预备知识-self attention

(1)self-attention 的机制和原理
self-attention 是⼀种通过⾃身和⾃身进⾏关联的 attention 机制, 从⽽得到更好的 representation 来表达⾃身.
self-attention 是 attention 机制的⼀种特殊情况: 在 self-attention 中, Q=K=V, 序列中的每个单词(token)都和该序列中的其他所有单词 (token)进⾏ attention 规则的计算.
attention 机制计算的特点在于, 可以直接跨越⼀句话中不同距离的 token, 可以远距离的学习到序列的知识依赖和语序结构.

  • 从上图中可以看到, self-attention 可以远距离的捕捉到语义层⾯的特征(it 的指代对象是 animal).
  • 应⽤传统的 RNN, LSTM, 在获取⻓距离语义特征和结构特征的时候, 需要按照序列顺序依次 计算, 距离越远的联系信息的损耗越⼤, 有效提取和捕获的可能性越⼩.
  • 但是应⽤ self-attention 时, 计算过程中会直接将句⼦中任意两个 token 的联系通过⼀个计算 步骤直接联系起来

(2)关于 self-attention 为什么要使⽤(Q, K, V)三元组⽽不是其他形式

⾸先⼀条就是从分析的⻆度看, 查询 Query 是⼀条独⽴的序列信息, 通过关键词 Key的提示作⽤, 得到最终语义的真实值 Value 表达, 数学意义更充分, 完备.

这⾥不使⽤(K, V)或者(V)没有什么必须的理由, 也没有相关的论⽂来严格阐述⽐较试验的结果差异, 所以可以作为开放性问题未来去探索, 只要明确在经典 self-attention实现中⽤的是三元组就好。


1. Transformer架构处理流程

Transformer的核心是其编码器-解码器架构

两个关键组件之间的共生关系,分别负责处理输入序列和生成输出序列。

编码器和解码器中的每一层都包含相同的子层,包括自注意力机制和前馈网络。

这种架构不仅有助于全面理解输入序列,而且能够生成上下文丰富的输出序列。

1.1编码器-解码器架构

Transformer的核心是其编码器-解码器架构——两个关键组件之间的共生关系,分别负责处理输入序列和生成输出序列。编码器和解码器中的每一层都包含相同的子层,包括自注意力机制和前馈网络。这种架构不仅有助于全面理解输入序列,而且能够生成上下文丰富的输出序列。

1.2 Encoder的处理阶段

分别为:输入阶段 –> 核心处理阶段 –> 残差与归一化阶段

输入阶段

1)初始输入

整个 Encoder 部分由 6 个相同的子模块按顺序连接构成。第一个 Encoder 子模块接收来自嵌入(Input Embedding)和位置编码(Position Embedding)组合后的输入(inputs)。这里的输入嵌入通常是将输入的原始数据(比如文本中的单词等)转化为向量表示,而位置编码则是为了让模型能够捕捉到输入序列中元素的位置信息,因为标准的向量表示本身没有位置概念。

(2)后续 Encoder 输入

除了第一个 Encoder 之外的其他 Encoder 子模块,它们从前一个 Encoder 接收相应的输入(inputs),这样就形成了一个顺序传递信息的链路。

核心处理阶段

(1)多头自注意力层处理

每个 Encoder 子模块在接收到输入后,首先会将其传递到多头自注意力层(Multi-Head Self-Attention layer)。在这一层中,通过多头自注意力机制(如前面所述,查询、键、值都来自同一个输入序列自身)去计算输入序列不同位置之间的关联关系,生成相应的自注意力输出。

(2)前馈层处理

自注意力层的输出紧接着被传递到前馈层(Feedforward layer)。前馈层一般是由全连接网络等构成,对自注意力层输出的特征做进一步的非线性变换,提取更复杂、高层次的特征,然后将其输出向上发送到下一个编码器(如果不是最后一个 Encoder 的话),以便后续 Encoder 子模块继续进行处理。

残差与归一化阶段

(1)残差连接(Residual Connection)

自注意力层和前馈子层(Feedforward sublayer)均配备了残差快捷链路。从网络拓扑结构来看,这种连接方式构建了一种并行通路,使得输入信号能够以残差的形式参与到每一层的输出计算中。

对于自注意力层,其输入会与自注意力层的输出进行相加操作( 假设自注意力层输入为 x,输出为 y,经过残差连接后变为 x + y )

同样,前馈层的输入也会和前馈层的输出进行相加。残差连接(Residual Connection)有助于缓解深度网络训练过程中的梯度消失或梯度爆炸问题,使得网络能够更容易地训练深层模型,并且能够让信息更顺畅地在网络中传递。

(2)层归一化(Layer Norm)

在残差连接之后,紧跟着会进行层归一化操作。层归一化是对每一层的神经元的输入进行归一化处理,它可以加速网络的收敛速度、提高模型的泛化能力等,使得模型训练更加稳定、高效。经过层归一化后的结果就是当前 Encoder 子模块最终的输出,然后传递给下一个 Encoder 子模块或者后续的其他模块(比如在 Encoder-Decoder 架构中传递给 Decoder 部分等情况)。

1.3 Decoder解码流程

  1. 输出嵌入的右向偏移

    在开始处理输入序列之前,模型对输出嵌入进行向右偏移一个位置,确保在训练阶段,解码器内的每个符号都能正确地获取之前生成符号的上下文信息。

  2. 位置编码的整合

    仿照编码器的设计,模型将位置编码与输出嵌入相结合,以此保留符号的序列顺序信息。

  3. 带掩码的多头自注意力机制

    解码器利用带掩码的多头自注意力机制,专注于输入序列的相关片段以及之前产生的符号。在训练过程中,通过掩码技术,防止模型对未来符号的预知,确保每个符号仅能参考其之前的符号。

  4. 编码器-解码器注意力交互

    除了带掩码的自注意力机制外,解码器还融合了编码器-解码器注意力交互,这使得解码器能够聚焦于输入序列的重要部分,进而生成受输入语境影响的输出符号。

  5. 基于位置的前馈网络

    在自注意力层之后,解码器对每个符号独立施加位置编码的前馈网络。该网络旨在捕捉输入和已生成符号之间的复杂模式与关联,以助力精确输出序列的构建。

Decoder 部分相比于 Encoder ,其结构中增加了一个名为“Mask”的多头注意力子层。与标准的多头注意力机制中的缩放点积注意力不同,Mask-Multi-Head-Attention在应用缩放操作之后引入了一个Mask步骤

这一操作的必要性源于解码过程中的特性:解码并非一次性完成,而是按时间步逐个生成输出。在生成某个特定时间步的输出时,模型不应访问该时间步之后的信息,因此通过Mask操作来隐藏未来的序列元素,确保解码过程的一致性和正确性。

在解码时的流程为:

  • 假设当前已经解码的序列为 s1,s2,…,st−1s1,s2,…,s**t−1 把该序列做词向量和位置向量嵌入
  • 对上述向量做 Masked Multi-Head Attention,把得到的结果作为 Q
  • Encoder 端的输出向量看做 K,V
  • 结合 Q,K,V 做 Multi-Head Attention 和 Feed-Forward Networks 等操作
  • 重复 decoder 部分 的子结构得到输出,而后解码得到输出词的概率

2. Transformer 各个模块细节

❤️2.1Encoder模块

  • 经典的Transformer架构中的Encoder模块包含6个Encorder Block。

  • 每个 Encoder Block 包含两个⼦模块, 分别是多头⾃注意⼒层(Multi-Head Self-Attention), 和前馈全连接层(Position-wise Feed-Forward Network).

  • 多头⾃注意⼒层采⽤的是⼀种 Scaled Dot-Product Attention 的计算⽅式, 实验结果表 明, Multi-head 可以在更细致的层⾯上提取不同 head 的特征, ⽐单⼀head 提取特征的 效果更佳.

  • 前馈全连接层是由两个全连接层组成, 线性变换中间增添⼀个 Relu 激活函数, 具体的 维度采⽤ 4 倍关系, 即多头⾃注意⼒的 d_model=512, 则层内的变换维度d_ff=2048

Multi-Head Self-Attention由 Scaled Dot-product Attention 和 Multi-Head Attention 以及 Self Attention 和 Add & Norm 组成。

2.1.1多头⾃注意⼒层(Multi-Head Self-Attention)

(1)采⽤ Multi-head Attention 的原因
原始论⽂中提到进⾏ Multi-head Attention 的原因是将模型分为多个头, 可以形成多个⼦空间间, 让模型去关注不同⽅⾯的信息, 最后再将各个⽅⾯的信息综合起来得到更好的效果.
多个头进⾏ attention 计算最后再综合起来, 类似于 CNN 中采⽤多个卷积核的作⽤, 不同的卷 积核提取不同的特征, 关注不同的部分, 最后再进⾏融合.

直观上讲, 多头注意⼒有助于神经⽹络捕捉到更丰富的特征信息.

(2)Multi-head Attention 的计算⽅式

1.Multi-head Attention 和单⼀ head 的 Attention 唯⼀的区别就在于, 其对特征张量的最后⼀个维度进⾏了分割, ⼀般是对词嵌⼊的 embedding_dim=512 进⾏切割成 head=8, 这样每⼀个 head 的嵌⼊维度就是 512/8=64, 后续的 Attention 计算公式完全⼀致, 只不过是在 64 这个维度上进⾏⼀系列的矩阵运算⽽已。

2.在 head=8 个头上分别进⾏注意⼒规则的运算后, 简单采⽤拼接 concat 的⽅式对结果张量进 ⾏融合就得到了 Multi-head Attention 的计算结果

Self-attention 归⼀化和放缩

训练上的意义:随着词嵌⼊维度 d_k 的增⼤, q * k 点积后的结果也会增⼤, 在训练时会将 softmax 函数推⼊梯度⾮常⼩的区域, 可能出现梯度消失的现象, 造成模型收敛困难.

数学上的意义: 假设 q 和 k 的统计变量是满⾜标准正态分布的独⽴随机变量, 意味着q 和 k 满⾜均 值为 0, ⽅差为 1。 那么 q 和 k 的点积结果就是均值为 0, ⽅差为 , 为了抵消这种⽅差被放⼤ 倍的影响, 在计算中主动将点积缩放 , 这样点积后的结果依然满⾜均值为 0, ⽅差为 1。

最终的结论:通过数学上的技巧将⽅差控制在 1, 也就有效的控制了点积结果的发散, 也就控制了对应的梯度消失的问题!

2.1.2 Add & Norm 模块

  • Add & Norm 模块接在每⼀个 Encoder Block 和 Decoder Block 中的每⼀个⼦
    层的后⾯.
  • 对于每⼀个 Encoder Block, ⾥⾯的两个⼦层后⾯都有 Add & Norm.
  • 对于每⼀个 Decoder Block, ⾥⾯的三个⼦层后⾯都有 Add & Norm.
  • Add 表示残差连接, 作⽤是为了将信息⽆损耗的传递的更深, 来增强模型的拟合能⼒.
  • Norm 表示 LayerNorm, 层级别的数值标准化操作, 作⽤是防⽌参数过⼤过⼩导致的学习过程异常 , 模型收敛特别慢的问题.

2.1.3 位置编码器 Positional Encoding

Transformer 中采⽤三⻆函数来计算位置编码.
因为三⻆函数是周期性函数, 不受序列⻓度的限制, ⽽且这种计算⽅式可以对序列中不同位置的编码的重要程度同等看待

❤️2.2 Decoder模块

  • 经典的 Transformer 架构中的 Decoder 模块包含 6 个 Decoder Block.
  • 每个 Decoder Block 包含 3 个⼦模块, 分别是多头⾃注意⼒层, Encoder-Decoder
    Attention 层, 和前馈全连接层.
    • 多头⾃注意⼒层采⽤和 Encoder 模块⼀样的 Scaled Dot-Product
      Attention 的计算⽅ 式, 最⼤的 区别在于需要添加 look-ahead-mask, 即遮掩”未来的信息”.
    • Encoder-Decoder Attention 层和上⼀层多头⾃注意⼒层最主要的区别在于Q != K = V, 矩阵 Q 来源于上⼀层 Decoder Block 的输出, 同时 K, V 来源于 Encoder 端的输出.
    • 前馈全连接层和 Encoder 中完全⼀样.

在 Transformer 结构中的 Decoder 模块的输⼊, 区分于不同的 Block, 最底层的Block 输⼊有其特殊的地⽅。第⼆层到第六层的输⼊⼀致, 都是上⼀层的输出和Encoder 的输出。

最底层的 Block 在训练阶段, 每⼀个 time step 的输⼊是上⼀个 time step 的输⼊加上真实标 签序列向后移⼀位. 具体来看, 就是每⼀个 time step 的输⼊序列会越来越⻓, 不断的将之前的 输⼊融合进来.

1
2
3
4
假设现在的真实标签序列等于"How are you?", 
time step=1时, 输⼊张量为⼀个特殊的token, ⽐如"SOS"; 当time step=2时, 输⼊张量为"SOS How";
time step=3时, 输⼊张量为"SOS How are";
以此类推...

最底层的 Block 在训练阶段, 真实的代码实现中, 采⽤的是 MASK 机制来模拟输⼊序列不断添 加的过程.

最底层的 Block 在预测阶段, 每⼀个 time step 的输⼊是从 time step=0 开始, ⼀直到上⼀个 time step 的预测值的累积拼接张量. 具体来看, 也是随着每⼀个
time step 的输⼊序列会越来越⻓. 相⽐于训练阶段最⼤的不同是这⾥不断拼接进来的 token 是每⼀个 time step 的预测值, ⽽不是训练阶段每⼀个 time step 取得的 groud truth 值

1
2
3
4
5
6
7
当time step=1时, 输⼊的input_tensor="SOS", 预测出来的输出值是output_tensor="What";
当time step=2时, 输⼊的input_tensor="SOS What", 预测出来的输出值是output_tensor="is";
当time step=3时, 输⼊的input_tensor="SOS What is", 预测出来的输出值是output_tensor="the";
当time step=4时, 输⼊的input_tensor="SOS What is the", 预测出来的输出值是output_tensor="matter";
当time step=5时, 输⼊的input_tensor="SOS What is the matter", 预测
出来的输出值是output_tensor="?";
当time step=6时, 输⼊的input_tensor="SOS What is the matter ?", 预测出来的输出值是output_tensor="EOS", 代表句⼦的结束符, 说明解码结束, 预测结束.

3. 代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
import math
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as Data

# Encoder_input Decoder_input Decoder_output(预测下一个字符)
sentences = [['我 是 学 生 P' , 'S I am a student' , 'I am a student E'], # S: 开始符号
['我 喜 欢 学 习', 'S I like learning P', 'I like learning P E'], # E: 结束符号
['我 是 男 生 P' , 'S I am a boy' , 'I am a boy E']] # P: 占位符号,如果当前句子不足固定长度用P占位 pad补0

# 以下的一个batch中是sentences[0,1]
src_vocab = {'P':0, '我':1, '是':2, '学':3, '生':4, '喜':5, '欢':6,'习':7,'男':8} # 词源字典 字:索引
src_idx2word = {src_vocab[key]: key for key in src_vocab}
src_vocab_size = len(src_vocab) # 字典字的个数

# 生成目标中 'S'是0填充的
tgt_vocab = {'S':0, 'E':1, 'P':2, 'I':3, 'am':4, 'a':5, 'student':6, 'like':7, 'learning':8, 'boy':9}
idx2word = {tgt_vocab[key]: key for key in tgt_vocab} # 把目标字典转换成 索引:字的形式
tgt_vocab_size = len(tgt_vocab) # 目标字典尺寸

src_len = len(sentences[0][0].split(" ")) # Encoder输入的最大长度 5
tgt_len = len(sentences[0][1].split(" ")) # Decoder输入输出最大长度 5

# 把sentences 转换成字典索引
def make_data(sentences):
enc_inputs, dec_inputs, dec_outputs = [], [], []
for i in range(len(sentences)): # 遍历每句话
enc_input = [[src_vocab[n] for n in sentences[i][0].split()]] # Encoder_input 索引
dec_input = [[tgt_vocab[n] for n in sentences[i][1].split()]] # Decoder_input 索引
dec_output = [[tgt_vocab[n] for n in sentences[i][2].split()]] # Decoder_output 索引
enc_inputs.extend(enc_input)
dec_inputs.extend(dec_input)
dec_outputs.extend(dec_output)
return torch.LongTensor(enc_inputs), torch.LongTensor(dec_inputs), torch.LongTensor(dec_outputs)
enc_inputs, dec_inputs, dec_outputs = make_data(sentences) # [3,5], [3,5], [3,5] 只是恰巧长度都为5,enc_inputs、dec_inputs长度可以不一样
print(enc_inputs)
print(dec_inputs)
print(dec_outputs)

'''
sentences 里一共有三个训练数据,中文->英文。把Encoder_input、Decoder_input、Decoder_output转换成字典索引,
例如"学"->3、“student”->6。再把数据转换成batch大小为2的分组数据,3句话一共可以分成两组,一组2句话、一组1句话。src_len表示中文句子
固定最大长度,tgt_len 表示英文句子固定最大长度。
'''
#自定义数据集函数
class MyDataSet(Data.Dataset):
def __init__(self, enc_inputs, dec_inputs, dec_outputs):
super(MyDataSet, self).__init__()
self.enc_inputs = enc_inputs
self.dec_inputs = dec_inputs
self.dec_outputs = dec_outputs

def __len__(self):
return self.enc_inputs.shape[0]

def __getitem__(self, idx):
return self.enc_inputs[idx], self.dec_inputs[idx], self.dec_outputs[idx]

loader = Data.DataLoader(MyDataSet(enc_inputs, dec_inputs, dec_outputs), 2, False)

d_model = 512 # 字 Embedding 的维度
d_ff = 2048 # 前向传播隐藏层维度
d_k = d_v = 64 # K(=Q), V的维度. V的维度可以和K=Q不一样
n_layers = 6 # 有多少个encoder和decoder
n_heads = 8 # Multi-Head Attention设置为8

# 位置嵌入,position Embedding
class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout=0.1, max_len=5000):
super(PositionalEncoding,self).__init__()
self.dropout = nn.Dropout(p=dropout)
pos_table = np.array([
[pos / np.power(10000, 2 * i / d_model) for i in range(d_model)]
if pos != 0 else np.zeros(d_model) for pos in range(max_len)])
pos_table[1:, 0::2] = np.sin(pos_table[1:, 0::2]) # 字嵌入维度为偶数时
pos_table[1:, 1::2] = np.cos(pos_table[1:, 1::2]) # 字嵌入维度为奇数时
self.pos_table = torch.FloatTensor(pos_table) # enc_inputs: [seq_len, d_model]

def forward(self,enc_inputs):
"""_summary_

Args:
enc_inputs (_type_): nn.embedding() [seq_len, batch_size, d_model]

Returns:
_type_: _description_
"""
enc_inputs += self.pos_table[:enc_inputs.size(1),:] # 两个embedding相加,参考https://www.cnblogs.com/d0main/p/10447853.html
return self.dropout(enc_inputs)

'''
Mask句子中没有实际意义的占位符,例如’我 是 学 生 P’ ,P对应句子没有实际意义,所以需要被Mask,Encoder_input 和Decoder_input占位符
都需要被Mask。
这就是为了处理,句子不一样长,但是输入有需要定长,不够长的pad填充,但是计算又不需要这个pad,所以mask掉

这个函数最核心的一句代码是 seq_k.data.eq(0),这句的作用是返回一个大小和 seq_k 一样的 tensor,只不过里面的值只有 True 和 False。如
果 seq_k 某个位置的值等于 0,那么对应位置就是 True,否则即为 False。举个例子,输入为 seq_data = [1, 2, 3, 4, 0],
seq_data.data.eq(0) 就会返回 [False, False, False, False, True]
'''
def get_attn_pad_mask(seq_q, seq_k):
"""
此时字还没表示成嵌入向量
句子0填充
seq_q中的每个字都要“看”一次seq_k中的每个字
Args: 在Encoder_self_att中,seq_q,seq_k 就是enc_input
seq_q (_type_): [batch, enc_len] [batch, 中文句子长度]
seq_k (_type_): [batch, enc_len] [batch, 中文句子长度]
在Decoder_self_att中,seq_q,seq_k 就是dec_input, dec_input
seq_q (_type_): [batch, tgt_len] [batch, 英文句子长度]
seq_k (_type_): [batch, tgt_len] [batch, 英文句子长度]
在Decoder_Encoder_att中,seq_q,seq_k 就是dec_input, enc_input
seq_q (_type_): [batch, tgt_len] [batch, 中文句子长度]
seq_k (_type_): [batch, enc_len] [batch, 英文句子长度]

Returns:
_type_: [batch_size, len_q, len_k] 元素:T or F
"""
batch_size, len_q = seq_q.size()# seq_q 用于升维,为了做attention,mask score矩阵用的
batch_size, len_k = seq_k.size()
pad_attn_mask = seq_k.data.eq(0) # 判断 输入那些词index含有P(=0),用1标记 [len_k, d_model]元素全为T,F
pad_attn_mask = pad_attn_mask.unsqueeze(1) #[batch, 1, len_k]
pad_attn_mask = pad_attn_mask.expand(batch_size, len_q, len_k) # 扩展成多维度 [batch_size, len_q, len_k]
return pad_attn_mask

'''
# Decoder输入Mask
用来Mask未来输入信息,返回的是一个上三角矩阵。比如我们在中英文翻译时候,会先把"我是学生"整个句子输入到Encoder中,得到最后一层的输出
后,才会在Decoder输入"S I am a student"(s表示开始),但是"S I am a student"这个句子我们不会一起输入,而是在T0时刻先输入"S"预测,
预测第一个词"I";在下一个T1时刻,同时输入"S"和"I"到Decoder预测下一个单词"am";然后在T2时刻把"S,I,am"同时输入到Decoder预测下一个单
词"a",依次把整个句子输入到Decoder,预测出"I am a student E"。
'''
def get_attn_subsequence_mask(seq):
"""
生成上三角Attention矩阵
Args:
seq (_type_): [batch_size, tgt_len]

Returns:
_type_: _description_
"""
attn_shape = [seq.size(0), seq.size(1), seq.size(1)] # 生成上三角矩阵,[batch_size, tgt_len, tgt_len]
subsequence_mask = np.triu(np.ones(attn_shape), k=1) # 得到主对角线向上平移一个距离的对角线(下三角包括对角线全为0)
subsequence_mask = torch.from_numpy(subsequence_mask).byte() # [batch_size, tgt_len, tgt_len]
return subsequence_mask

# 计算注意力信息、残差和归一化
class ScaledDotProductAttention(nn.Module):
def __init__(self):
super(ScaledDotProductAttention, self).__init__()

def forward(self, Q, K, V, attn_mask):
'''
注意!: d_q和d_k一定一样
d_v和d_q、d_k可以不一样
len_k和len_v的长度一定是一样的(翻译任务中,k,v要求都从中文文本生成)
:param Q: [batch_size, n_heads, len_q, d_k]
:param K: [batch_size, n_heads, len_k, d_k] len_k和len_v的长度一定是一样的
:param V: [batch_size, n_heads, len_v, d_v(和d_q、d_k不一定一样,但d_q和d_k可以一样)]
:param attn_mask: [batch_size, n_heads, len_q, len_k] attn_mask此时还是T or F

:return: [batch_size, n_heads, len_q, d_v], [batch_size, n_heads, len_q, len_k]
'''
scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k) # scores : [batch_size, n_heads, len_q, len_k]
scores.masked_fill_(attn_mask, -1e9) # 如果是停用词P就等于负无穷 在原tensor修改
attn = nn.Softmax(dim=-1)(scores) # PADmask位置分数变为0
# [batch_size, n_heads, len_q, len_k] * [batch_size, n_heads, len_v, d_v] = [batch_size, n_heads, len_q, d_v]
context = torch.matmul(attn, V) # 注意!len_k和len_v的长度一定是一样的
return context, attn

# 多头自注意力机制
# 拼接之后 输入fc层 加入残差 Norm
class MultiHeadAttention(nn.Module):
def __init__(self):
super(MultiHeadAttention, self).__init__()
self.W_Q = nn.Linear(d_model, d_k * n_heads, bias=False)
self.W_K = nn.Linear(d_model, d_k * n_heads, bias=False)
self.W_V = nn.Linear(d_model, d_v * n_heads, bias=False)

self.fc = nn.Linear(n_heads * d_v, d_model, bias=False)

def forward(self, input_Q, input_K, input_V, attn_mask):
'''
enc_self_attn_mask里 input_Q,input_K,input_V: 词嵌入、位置嵌入之后的矩阵都是 [batch_size, src_len, d_model]
dec_self_attn_mask里 input_Q,input_K,input_V: 词嵌入、位置嵌入之后的矩阵都是 [batch_size, tag_len, d_model]
dec_enc_attn_mask里 input_Q,input_K,input_V: 词嵌入、位置嵌入之后的矩阵
input_Q: dec_input [batch_size, tag_len, d_model]
input_K: enc_output [batch_size, src_len, d_model]
input_V: enc_output [batch_size, src_len, d_model]
:param attn_mask:
enc_self_attn_mask: [batch_size, src_len, src_len]元素全为T or F, T的位置是要掩码(PAD填充)的位置
dec_self_attn_mask: [batch_size, tgt_len, tgt_len]元素全为T or F, T的位置是要掩码(PAD填充)的位置
dec_enc_attn_mask: [batch_size, tgt_len, src_len]元素全为T or F, T的位置是要掩码(PAD填充)的位置
:return: [batch_size, len_q, d_model]
'''
residual, batch_size = input_Q, input_Q.size(0)
Q = self.W_Q(input_Q).view(batch_size, -1, n_heads, d_k).transpose(1,2) # Q: [batch_size, n_heads, len_q, d_k]
K = self.W_K(input_K).view(batch_size, -1, n_heads, d_k).transpose(1,2) # K: [batch_size, n_heads, len_k, d_k]
V = self.W_V(input_V).view(batch_size, -1, n_heads, d_v).transpose(1,2) # V: [batch_size, n_heads, len_v, d_v]
attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1) # attn_mask : [batch_size, n_heads, len_q, len_k]
context, attn = ScaledDotProductAttention()(Q, K, V, attn_mask) # context: [batch_size, n_heads, len_q, d_v]
# attn: [batch_size, n_heads, len_q, len_k]
# 拼接多头的结果
context = context.transpose(1, 2).reshape(batch_size, -1, n_heads * d_v) # context: [batch_size, len_q, n_heads * d_v(d_model)]
output = self.fc(context) # d_v fc之后变成d_model -> [batch_size, len_q, d_model]
return nn.LayerNorm(d_model)(output + residual), attn


'''
## 前馈神经网络
输入inputs ,经过两个全连接层,得到的结果再加上 inputs (残差),再做LayerNorm归一化。LayerNorm归一化可以理解层是把Batch中每一句话
进行归一化。
'''
class FF(nn.Module):
def __init__(self):
super(FF, self).__init__()
self.fc = nn.Sequential(
nn.Linear(d_model, d_ff, bias=False),
nn.ReLU(),
nn.Linear(d_ff, d_model, bias=False)
)

def forward(self, inputs): # inputs: [batch_size, seq_len, d_model]
residual = inputs
output = self.fc(inputs)
return nn.LayerNorm(d_model)(output + residual) # [batch_size, seq_len, d_model]

## encoder layer(block)
class EncoderLayer(nn.Module):
def __init__(self):
super(EncoderLayer, self).__init__()
self.enc_self_attn = MultiHeadAttention() # 多头注意力机制
self.pos_ffn = FF() # 前馈神经网络

def forward(self, enc_inputs, enc_self_attn_mask): # enc_inputs: [batch_size, src_len, d_model]
'''
:param enc_inputs: [batch_size, src_len, d_model] 词嵌入、位置嵌入之后的输入矩阵
:param enc_self_attn_mask: [batch_size, src_len, src_len]元素全为T or F, T的是要掩码(PAD填充)的位置
:return:
'''
#输入3个enc_inputs分别与W_q、W_k、W_v相乘得到Q、K、V # enc_self_attn_mask: [batch_size, src_len, src_len]
enc_outputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs, # enc_outputs: [batch_size, src_len, d_model],
enc_self_attn_mask) # attn: [batch_size, n_heads, src_len, src_len]
# 多头自注意力机制之后(Add & Norm之后),进行FF(Add & Norm)
enc_outputs = self.pos_ffn(enc_outputs) # enc_outputs: [batch_size, src_len, d_model]
return enc_outputs, attn

'''
## Encoder
第一步,中文字索引进行Embedding,转换成512维度的字向量。
第二步,在子向量上面加上位置信息。
第三步,Mask掉句子中的占位符号。
第四步,通过6层的encoder(上一层的输出作为下一层的输入)。
'''
class Encoder(nn.Module):
def __init__(self):
super(Encoder, self).__init__()
self.src_emb = nn.Embedding(src_vocab_size, d_model)
self.pos_emb = PositionalEncoding(d_model)
self.layers = nn.ModuleList(
[EncoderLayer() for _ in range(n_layers)]
)

def forward(self, enc_inputs):
'''
enc_inputs: [batch_size, src_len] 元素是字典词index
'''
enc_outputs = self.src_emb(enc_inputs) # [batch_size, src_len, d_model]
enc_outputs = self.pos_emb(enc_outputs.transpose(0, 1)).transpose(0, 1) # [batch_size, src_len, d_model]
enc_self_attn_mask = get_attn_pad_mask(enc_inputs, enc_inputs) # [batch_size, src_len, src_len]
enc_self_attns = []
for layer in self.layers:
# enc_outputs: [batch_size, src_len, d_model], enc_self_attn: [batch_size, n_heads, src_len, src_len]
enc_outputs, enc_self_attn = layer(enc_outputs, enc_self_attn_mask) # 一个EncoderBlock输出,注意力分数矩阵
enc_self_attns.append(enc_self_attn) # 记录注意力分数矩阵
# enc_outputs: [batch_size, src_len, d_model]
# enc_self_attn: [batch_size, n_heads, src_len, src_len]
return enc_outputs, enc_self_attns




# 测试
'''
enc_inputs:
tensor([[1, 2, 3, 4, 0],
[1, 5, 6, 3, 7],
[1, 2, 8, 4, 0]])
'''
#enc_outputs, enc_self_attns = Encoder()(enc_inputs)
#print(enc_outputs.shape) # torch.Size([3, 5, 512])





# decoder layer(block)
# decoder两次调用MultiHeadAttention时,第一次调用传入的 Q,K,V 的值是相同的,都等于dec_inputs,第二次调用 Q 矩阵是来自Decoder的
# 输入。K,V 两个矩阵是来自Encoder的输出,等于enc_outputs。
class DecoderLayer(nn.Module):
def __init__(self):
super(DecoderLayer, self).__init__()
self.dec_self_attn = MultiHeadAttention()
self.dec_enc_attn = MultiHeadAttention()
self.pos_ffn = FF()

def forward(self, dec_inputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask):
"""
解码器一个Block包含两个多投资注意力机制
Args:
dec_inputs (_type_): [batch_size, tgt_len, d_model]
enc_outputs (_type_): [batch_size, src_len, d_model] # Encoder的输出
dec_self_attn_mask (_type_): [batch_size, tgt_len, tgt_len]
dec_enc_attn_mask (_type_): [batch_size, tgt_len, src_len]

Returns:
_type_: _description_
"""
# dec_outputs: [batch_size, tgt_len, d_model]
# dec_self_attn: [batch_size, n_heads, tgt_len, tgt_len]
dec_outputs, dec_self_attn = self.dec_self_attn(dec_inputs, dec_inputs, dec_inputs,
dec_self_attn_mask)

# decoder自注意力之后的值作为Q值。K,V来自Encoder的输出
# dec_outputs: [batch_size, tgt_len, d_model]
# dec_enc_attn: [batch_size, h_heads, tgt_len, src_len]
dec_outputs, dec_enc_attn = self.dec_enc_attn(dec_outputs, enc_outputs, enc_outputs,
dec_enc_attn_mask)

dec_outputs = self.pos_ffn(dec_outputs) # dec_outputs: [batch_size, tgt_len, d_model]
return dec_outputs, dec_self_attn, dec_enc_attn


'''
# Decoder
第一步,英文字索引进行Embedding,转换成512维度的字向量。
第二步,在子向量上面加上位置信息。
第三步,Mask掉句子中的占位符号和输出顺序.
第四步,通过6层的decoder(上一层的输出作为下一层的输入)
'''


class Decoder(nn.Module):
def __init__(self):
super(Decoder, self).__init__()
self.tgt_emb = nn.Embedding(tgt_vocab_size, d_model)
self.pos_emb = PositionalEncoding(d_model)
self.layers = nn.ModuleList([DecoderLayer() for _ in range(n_layers)])

def forward(self, dec_inputs, enc_inputs, enc_outputs):
'''
enc_intpus: [batch_size, src_len]
dec_inputs: [batch_size, tgt_len]
enc_outputs: [batch_size, src_len, d_model]
'''
dec_outputs = self.tgt_emb(dec_inputs) # [batch_size, tgt_len, d_model]
dec_outputs = self.pos_emb(dec_outputs.transpose(0, 1)).transpose(0, 1) # [batch_size, tgt_len, d_model]
# PAD 0填充Mask掉 (Decoder输入序列的pad mask矩阵(这个例子中decoder是没有加pad的,实际应用中都是有pad填充的))
# Decoder中 0填充的位置是'S',也就是第一个位置要Mask掉,为true
dec_self_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs) # [batch_size, tgt_len, tgt_len] T or F
'''
此时的一个batch:['S I am a student', 'S I like learning P']
dec_self_attn_pad_mask:
tensor([[[ True, False, False, False, False],
[ True, False, False, False, False],
[ True, False, False, False, False],
[ True, False, False, False, False],
[ True, False, False, False, False]],

[[ True, False, False, False, False],
[ True, False, False, False, False],
[ True, False, False, False, False],
[ True, False, False, False, False],
[ True, False, False, False, False]]])'''
# Masked Self_Attention:当前时刻是看不到未来的信息的
dec_self_attn_subsequence_mask = get_attn_subsequence_mask(dec_inputs) # [batch_size, tgt_len, tgt_len] 下三角包括对角线为0,上三角为1
'''
tensor([[[0, 1, 1, 1, 1],
[0, 0, 1, 1, 1],
[0, 0, 0, 1, 1],
[0, 0, 0, 0, 1],
[0, 0, 0, 0, 0]],

[0, 1, 1, 1, 1],
[0, 0, 1, 1, 1],
[0, 0, 0, 1, 1],
[0, 0, 0, 0, 1],
[0, 0, 0, 0, 0]]], dtype=torch.uint8)'''
# Decoder中把两种mask矩阵相加(既屏蔽了pad的信息,也屏蔽了未来时刻的信息)
# torch.gt() 比较Tensor1和Tensor2的每一个元素,并返回一个0-1值.若Tensor1中的元素大于Tensor2中的元素,则结果取1,否则取0
dec_self_attn_mask = torch.gt((dec_self_attn_pad_mask + dec_self_attn_subsequence_mask),
0) # [batch_size, tgt_len, tgt_len]
'''tensor([[[ True, True, True, True, True],
[ True, False, True, True, True],
[ True, False, False, True, True], # 注意到之前的,当然不包括开始字符'S'。但是后面PAD的位置也会注意到前面PAD的位置
[ True, False, False, False, True],
[ True, False, False, False, False]],

[ True, True, True, True, True],
[ True, False, True, True, True],
[ True, False, False, True, True],
[ True, False, False, False, True],
[ True, False, False, False, False]]])'''
# 这个mask主要用于encoder-decoder attention层
# get_attn_pad_mask主要是enc_inputs的pad mask矩阵(因为enc是处理K,V的,求Attention时是用v1,v2,..vm去加权的,
# 要把pad对应的v_i的相关系数设为0,这样注意力就不会关注pad向量)
# dec_inputs只是提供expand的size的
dec_enc_attn_mask = get_attn_pad_mask(dec_inputs, enc_inputs) # [batc_size, tgt_len, src_len]
'''
此时的一个batch: 'S I am a student' 'S I like learning P'
下面的tensor是上面两个dec_input样本对应的enc_input的掩码矩阵
tensor([[[False, False, False, False, False],
[False, False, False, False, False],
[False, False, False, False, False],
[False, False, False, False, False],
[False, False, False, False, False]]

[[False, False, False, False, True],
[False, False, False, False, True],
[False, False, False, False, True],
[False, False, False, False, True],
[False, False, False, False, True]]
])'''

dec_self_attns, dec_enc_attns = [], []
for layer in self.layers:
# dec_outputs: [batch_size, tgt_len, d_model],
# dec_self_attn: [batch_size, n_heads, tgt_len, tgt_len],
# dec_enc_attn: [batch_size, h_heads, tgt_len, src_len]
dec_outputs, dec_self_attn, dec_enc_attn = layer(dec_outputs, enc_outputs, dec_self_attn_mask,
dec_enc_attn_mask)
dec_self_attns.append(dec_self_attn)
dec_enc_attns.append(dec_enc_attn)
return dec_outputs, dec_self_attns, dec_enc_attns


'''
# Transformer
Trasformer的整体结构,输入数据先通过Encoder,再通过Decoder,
最后把输出进行多分类,分类数为英文字典长度,也就是判断每一个字的概率。
'''


class Transformer(nn.Module):
def __init__(self):
super(Transformer, self).__init__()
self.Encoder = Encoder()
self.Decoder = Decoder()
# 翻译到英文词的分类
self.projection = nn.Linear(d_model, tgt_vocab_size, bias=False)

def forward(self, enc_inputs, dec_inputs):
"""
transformer
Args:
enc_inputs (_type_): [batch_size, src_len]
dec_inputs (_type_): [batch_size, tgt_len]

Returns:
_type_: _description_
"""
# encoder部分
# enc_outputs: [batch_size, src_len, d_model],
# enc_self_attns: [n_layers, batch_size, n_heads, src_len, src_len]
enc_outputs, enc_self_attns = self.Encoder(enc_inputs)

# decoder部分
# dec_outpus : [batch_size, tgt_len, d_model],
# dec_self_attns: [n_layers, batch_size, n_heads, tgt_len, tgt_len],
# dec_enc_attn : [n_layers, batch_size, tgt_len, src_len]
dec_outputs, dec_self_attns, dec_enc_attns = self.Decoder(dec_inputs, enc_inputs, enc_outputs)

dec_logits = self.projection(dec_outputs) # dec_logits: [batch_size, tgt_len, tgt_vocab_size]
dec_logits = dec_logits.view(-1, dec_logits.size(-1)) # dec_logits: [batch_size*tgt_len, tgt_vocab_size]
return dec_logits, enc_self_attns, dec_self_attns, dec_enc_attns


'''
# 定义网络
'''
model = Transformer()
criterion = nn.CrossEntropyLoss(ignore_index=0) # 忽略 占位符 索引为0.
optimizer = optim.SGD(model.parameters(), lr=1e-3, momentum=0.99)

'''
# 训练Transformer
'''
for epoch in range(50):
for enc_inputs, dec_inputs, dec_outputs in loader:
enc_inputs, dec_inputs, dec_outputs = enc_inputs, dec_inputs, dec_outputs # [2,5] [2,5] [2,5]
outputs, enc_self_attns, dec_self_attns, dec_enc_attns = model(enc_inputs, dec_inputs)
loss = criterion(outputs, dec_outputs.view(-1)) # outputs: [batch_size*tgt_len, tgt_vocab_size]
print('Epoch:', '%04d' % (epoch + 1), 'loss =', '{:.6f}'.format(loss))
optimizer.zero_grad()
loss.backward()
optimizer.step()


# 测试

def test(model, enc_input, start_symbol):
'''
enc_input: [1, src_len] 只取一个例子
'''
# 先得到Encoder的输出
enc_outputs, enc_self_attns = model.Encoder(enc_input) # [1,src_len, d_model] []

dec_input = torch.zeros(1, tgt_len).type_as(enc_input.data) # [1, tgt_len]

next_symbol = start_symbol

for i in range(0, tgt_len):
dec_input[0][i] = next_symbol

# 然后一个一个解码
dec_outputs, _, _ = model.Decoder(dec_input, enc_input, enc_outputs) # [1, tgt_len, d_model]

projected = model.projection(dec_outputs) # [1, tgt_len, tgt_voc_size]
prob = projected.squeeze(0).max(dim=-1, keepdim=False)[1] # [tgt_len][索引]
next_word = prob.data[i] # 不断地预测所有字,但是只取下一个字
next_symbol = next_word.item()
return dec_input


enc_inputs, _, _ = next(iter(loader))
# enc_input只取一个例子[1]
# 预测dec_input
# dec_input全部预测出来之后,在输入Model预测 dec_output
predict_dec_input = test(model, enc_inputs[1].view(1, -1), start_symbol=tgt_vocab["S"]) # [1, tgt_len]
# 然后走一遍完整的过程
predict, _, _, _ = model(enc_inputs[1].view(1, -1), predict_dec_input) # [tat_len, tgt_voc_size]

predict = predict.data.max(1, keepdim=True)[1]
print([src_idx2word[int(i)] for i in enc_inputs[1]], '->', [idx2word[n.item()] for n in predict.squeeze()])

觉得不错的话,支持一根棒棒糖吧 ୧(๑•̀⌄•́๑)૭



wechat pay



alipay

一文读懂transform原理及代码实现
http://yuting0907.github.io/posts/8c15d6cc.html
作者
Echo Yu
发布于
2025年1月13日
许可协议