通道注意⼒机制keras_机器翻译的Attention机制
values翻译在机器翻译(Neural Machine Translation)中,Seq2Seq模型将源序列映射到⽬标序列,其中Encoder部分将源序列编码为Context Vector传递给Decoder,Decoder将Context Vector解码为⽬标语⾔的序列。
Encoder-decoder architecture
在输⼊序列很长的情况,在预测⽬标序列的时候,Attention机制可以使得Model能够将注意⼒集中在关键的相关词上,从⽽提升机器翻译模型的效果。
Bahdanau Attention
Bahdanau Attention的公式如下:
Bahdanau Attention的实现代码:
class BahdanauAttention(tf.keras.layers.Layer):
def __init__(self, units):
super(BahdanauAttention, self).__init__()
self.W1 = tf.keras.layers.Dense(units)
self.W2 = tf.keras.layers.Dense(units)
self.V = tf.keras.layers.Dense(1)
def call(self, query, values):
# query hidden state shape == (batch_size, hidden size)
# query_with_time_axis shape == (batch_size, 1, hidden size)
# values shape == (batch_size, max_len, hidden size)
# we are doing this to broadcast addition along the time axis to calculate the score
query_with_time_axis = tf.expand_dims(query, 1)
# score shape == (batch_size, max_length, 1)
# we get 1 at the last axis because we are applying score to self.V
# the shape of the tensor before applying self.V is (batch_size, max_length, units)
score = self.anh(
self.W1(query_with_time_axis) + self.W2(values)))
# attention_weights shape == (batch_size, max_length, 1)
attention_weights = tf.nn.softmax(score, axis=1)
# context_vector shape after sum == (batch_size, hidden_size)
context_vector = attention_weights * values
context_vector = tf.reduce_sum(context_vector, axis=1)
return context_vector, attention_weights
Decoder+Attention
在Decoder过程中引⼊Attention机制,并将Attention的结果与Decoder Input拼接,送⼊GRU完成翻译过程。
Attention mechanism, described in (Luong et al., 2015)
class Decoder(tf.keras.Model):
def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz):
super(Decoder, self).__init__()
self.batch_sz = batch_sz
self.dec_units = dec_units
return_sequences=True,
return_state=True,
recurrent_initializer='glorot_uniform')
self.fc = tf.keras.layers.Dense(vocab_size)
# used for attention
self.attention = BahdanauAttention(self.dec_units)
def call(self, x, hidden, enc_output):
# enc_output shape == (batch_size, max_length, hidden_size)
context_vector, attention_weights = self.attention(hidden, enc_output)
# x shape after passing through embedding == (batch_size, 1, embedding_dim)
x = bedding(x)
# x shape after concatenation == (batch_size, 1, embedding_dim + hidden_size)
x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)
# passing the concatenated vector to the GRU
output, state = u(x)
# output shape == (batch_size * 1, hidden_size)
output = tf.reshape(output, (-1, output.shape[2]))
# output shape == (batch_size, vocab)
x = self.fc(output)
return x, state, attention_weights
Encoder
Encoder模块使⽤Word Embedding将单词投影到连续向量空间中,通过GRU输出Whole Sequence Output=(batch size, sequence length, units), Final State=(batch size, units)。
class Encoder(tf.keras.Model):
def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz):
super(Encoder, self).__init__()
self.batch_sz = batch_sz
<_units = enc_units
return_sequences=True,
return_state=True,
recurrent_initializer='glorot_uniform')
def call(self, x, hidden):
x = bedding(x)
output, state = u(x, initial_state = hidden)
return output, state
def initialize_hidden_state(self):
s((self.batch_sz, _units))
Optimizer和Loss Function
Seq2Seq的⽅法把机器翻译问题转换成⼀个分类问题,分类问题常⽤的损失函数是Cross Entropy Loss,Cross Entropy Loss的公式:其中是预测结果,是Ground Truth。Tensorflow中提供的CrossEntropy函数:
tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=False, reduction=losses_utils.ReductionV2.AUTO,
name='sparse_categorical_crossentropy'
)
y_pred为N维向量(N为类别的个数),label为单个数字,如果label也是One-hot之后的值,需要使⽤CategoricalCrossentropy损失函数。
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction='none')
def loss_function(real, pred):
mask = tf.math.logical_not(tf.math.equal(real, 0))
loss_ = loss_object(real, pred)
mask = tf.cast(mask, dtype=loss_.dtype)
loss_ *= mask
duce_mean(loss_)
Training
单个Training Step中的Teacher Forcing⽅法是将Target Word作为Decoder的每个Time Step的输⼊。
@tf.function
def train_step(inp, targ, enc_hidden):
loss = 0
with tf.GradientTape() as tape:
enc_output, enc_hidden = encoder(inp, enc_hidden)
dec_hidden = enc_hidden
dec_input = tf.expand_dims([targ_lang.word_index['']] * BATCH_SIZE, 1)
# Teacher forcing - feeding the target as the next input
for t in range(1, targ.shape[1]):
# passing enc_output to the decoder
predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_output)
loss += loss_function(targ[:, t], predictions)
# using teacher forcing
dec_input = tf.expand_dims(targ[:, t], 1)
batch_loss = (loss / int(targ.shape[1]))
variables = ainable_variables + ainable_variables
gradients = adient(loss, variables)
optimizer.apply_gradients(zip(gradients, variables))
return batch_loss
Training的过程是标准的Training写法。
EPOCHS = 10
for epoch in range(EPOCHS):
start = time.time()
enc_hidden = encoder.initialize_hidden_state()
total_loss = 0
for (batch, (inp, targ)) in enumerate(dataset.take(steps_per_epoch)):
batch_loss = train_step(inp, targ, enc_hidden)
total_loss += batch_loss
if batch % 100 == 0:
print('Epoch {} Batch {} Loss {:.4f}'.format(epoch + 1,
batch,
batch_loss.numpy()))
# saving (checkpoint) the model every 2 epochs
if (epoch + 1) % 2 == 0:
checkpoint.save(file_prefix = checkpoint_prefix)
print('Epoch {} Loss {:.4f}'.format(epoch + 1,
total_loss / steps_per_epoch))
print('Time taken for 1 epoch {} sec\n'.format(time.time() - start))
训练输出如下:
Epoch 1 Batch 0 Loss 4.4937
Epoch 1 Batch 100 Loss 2.3472
Epoch 1 Batch 200 Loss 1.9153
Epoch 1 Batch 300 Loss 1.8042
Epoch 1 Loss 2.0265
Time taken for 1 epoch 27.345187664031982 sec
Epoch 2 Batch 0 Loss 1.5260
Epoch 2 Batch 100 Loss 1.5228
Epoch 2 Batch 200 Loss 1.3840
Epoch 2 Batch 300 Loss 1.3131
Epoch 2 Loss 1.3900
Time taken for 1 epoch 15.777411222457886 sec
评估函数
Evaluate函数与Trainning的过程相似,主要区别在于不使⽤Teacher Forcing⽅法,Decoder的每个Time Step的输⼊是前⼀个Step的输出,当遇到结束符时翻译过程结束。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论