基于tensorflow的bilstm_crf的命名实体识别(数据集是msra命名实体识
别数据集)
github地址:
1、熟悉数据
msra数据集总共有三个⽂件:
<:部分数据
当/o 希望⼯程/o 救助/o 的/o 百万/o ⼉童/o 成长/o 起来/o ,/o 科教/o 兴/o 国/o 蔚然成风/o 时/o ,/o 今天/o 有/o 收藏/o 价值/o 的/o 书/o 你/o 没/o 买/o ,/o 明⽇/o 就/o 叫/o 你/o 悔不当初/o !/o 藏书/o 本来/o 就/o 是/o 所有/o 传统/o 收藏/o 门类/o 中/o 的/o 第⼀/o ⼤户/o ,/o 只是/o 我们/o 结束/o 温饱/o 的/o 时间/o 太/o 短/o ⽽已/o 。/o
因/o 有关/o ⽇/ns 寇/o 在/o 京/ns 掠夺/o ⽂物/o 详情/o ,/o 藏/o 界/o 较为/o 重视/o ,/o 也是/o 我们/o 收藏/o 北京/ns 史料/o 中/o 的/o 要件/o 之⼀/o 。/o
<:部分数据
今天的演讲会是由哈佛⼤学费正清东亚研究中⼼主任傅⾼义主持的。
<:部分数据
今天的演讲会是由/o 哈佛⼤学费正清东亚研究中⼼/nt 主任/o 傅⾼义/nr 主持的。/o
2、数据预处理
代码:
#coding:utf-8
import os
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) #当前程序上上⼀级⽬录,这⾥为ner
import sys
sys.path.append(BASE_DIR)
print(BASE_DIR)
import codecs
import re
import pandas as pd
import numpy as np
from config.globalConfig import *
#============================第⼀步:给每⼀个字打上标签===================================
def wordtag():
#⽤utf-8-sig编码的原因是⽂本保存时包含了BOM(Byte Order Mark,字节顺序标记,\ufeff出现在⽂本⽂件头部,为了去掉这个
input_data = codecs.open(os.path.join(PATH,'data/'),'r','utf-8-sig') #⼀般使⽤codes打开⽂件,不会出现编码问题
output_data = codecs.open(os.path.join(PATH,'data/'),'w','utf-8')
for line in adlines():
#line=re.split('[,。;!:?、‘’“”]/[o]'.decode('utf-8'),line.strip())
line = line.strip().split()
if len(line)==0: #过滤掉''
continue
for word in line: #遍历列表中的每⼀个词
word = word.split('/') #['希望⼯程', 'o'],每个词是这样的了
if word[1]!='o': #如果不是o
if len(word[0])==1: #如果是⼀个字,那么就直接给标签
output_data.write(word[0]+"/B_"+word[1]+"")
elif len(word[0])==2: #如果是两个字则拆分给标签
output_data.write(word[0][0]+"/B_"+word[1]+"")
output_data.write(word[0][1]+"/E_"+word[1]+"")
else: #如果两个字以上,也是拆开给标签
output_data.write(word[0][0]+"/B_"+word[1]+"")
for j in word[0][1:len(word[0])-1]:
output_data.write(j+"/M_"+word[1]+"")
output_data.write(word[0][-1]+"/E_"+word[1]+"")
else: #如果表⽰前是o的话,将拆开为字并分别给标签/o
for j in word[0]:
output_data.write(j+"/o"+"")
output_data.write('\n')
input_data.close()
output_data.close()
#============================第⼆步:构建⼆维字列表以及其对应的⼆维标签列表===================================
wordtag()
datas = list()
labels = list()
linedata=list()
linelabel=list()
# 0表⽰补全的id
tag2id = {'' :0,
'B_ns' :1,
'B_nr' :2,
'B_nt' :3,
'M_nt' :4,
'M_nr' :5,
'M_ns' :6,
'E_nt' :7,
'E_nr' :8,
'E_ns' :9,
'o': 10}
id2tag = {0:'' ,
1:'B_ns' ,
2:'B_nr' ,
3:'B_nt' ,
4:'M_nt' ,
5:'M_nr' ,
6:'M_ns' ,
7:'E_nt' ,
8:'E_nr' ,
9:'E_ns' ,
10: 'o'}
input_data = codecs.open(os.path.join(PATH,'data/'),'r','utf-8')
for line in adlines(): #每⼀个line实际上是这样⼦的:当/o 希/o 望/o ⼯/o 程/o 救/o 助/o 注意最后多了个''
line=re.split('[,。;!:?、‘’“”]/[o]'.encode("utf-8").decode('utf-8'),line.strip()) #a按指定字符划分字符串
for sen in line: #
sen = sen.strip().split() #每⼀个字符串列表再按照弄空格划分,然后每个字是:当/o
if len(sen)==0: #过滤掉为空的
continue
linedata=[]
linelabel=[]
num_not_o=0
for word in sen: #遍历每⼀个字
word = word.split('/') #第⼀位是字,第⼆位是标签
linedata.append(word[0]) #加⼊到字列表
linelabel.append(tag2id[word[1]]) #加⼊到标签列表,要转换成对应的id映射
if word[1]!='o':
num_not_o+=1 #记录标签不是o的字的个数
if num_not_o!=0: #如果num_not_o不为0,则表明当前linedata和linelabel有要素
datas.append(linedata)
labels.append(linelabel)
input_data.close()
print(len(datas))
print(len(labels))
#============================第三步:构建word2id以及id2word===================================
#from compiler.ast import flatten (在python3中不推荐使⽤),我们⾃⼰定义⼀个
def flat2gen(alist):
for item in alist:
if isinstance(item, list):
for subitem in item: yield subitem
else:
yield item
all_words = list(flat2gen(datas)) #获得包含所有字的列表
sr_allwords = pd.Series(all_words) #转换为pandas中的Series
sr_allwords = sr_allwords.value_counts() #统计每⼀个字出现的次数,相当于去重
set_words = sr_allwords.index #每⼀个字就是⼀个index,这⾥的字按照频数从⾼到低排序了
set_ids = range(1, len(set_words)+1) #给每⼀个字⼀个id映射,注意这⾥是从1开始,因为我们填充序列时使⽤0填充的,也就是id为0的已经被占⽤了
word2id = pd.Series(set_ids, index=set_words) #字 id
id2word = pd.Series(set_words, index=set_ids) #id 字
word2id["unknow"] = len(word2id)+1 #加⼊⼀个unknow,如果没出现的字就⽤unknow的id代替
#============================第四步:定义序列最⼤长度,对序列进⾏处理==================================
max_len = MAX_LEN #句⼦的最⼤长度
def X_padding(words):
"""把 words 转为 id 形式,并⾃动补全位 max_len 长度。"""
ids = list(word2id[words])
if len(ids) >= max_len: # 长则弃掉
return ids[:max_len]
return ids
def y_padding(ids):
"""把 tags 转为 id 形式,并⾃动补全位 max_len 长度。"""
if len(ids) >= max_len: # 长则弃掉
return ids[:max_len]
return ids
def get_true_len(ids):
return len(ids)
df_data = pd.DataFrame({'words': datas, 'tags': labels}, index=range(len(datas))) #DataFrame,索引是序列的个数,列是字序列以及对应的标签序列
df_data['length'] = df_data["tags"].apply(get_true_len) #获得每个序列真实的长度
df_data['length'][df_data['length'] > MAX_LEN] = MAX_LEN #这⾥需要注意,如果序列长度⼤于最⼤长度,则其真实长度必须设定为最⼤长度,否则后⾯会报错df_data['x'] = df_data['words'].apply(X_padding) #超截短补,新定义⼀列
df_data['y'] = df_data['tags'].apply(y_padding) #超截短补,新定义⼀列
x = np.asarray(list(df_data['x'].values)) #转为list
y = np.asarray(list(df_data['y'].values)) #转为list
length = np.asarray(list(df_data['length'].values)) #转为list
#============================第四步:划分训练集、测试集、验证集==================================
#del_selection import train_test_split
#x_train,x_test, y_train, y_test = train_test_split(x, y, test_size=0.1, random_state=43) #random_state:避免每⼀个划分得不同
#x_train, x_valid, y_train, y_valid = train_test_split(x_train, y_train, test_size=0.2, random_state=43)
#我们要加⼊每个序列的长度,因此sklearn⾃带的划分就没有⽤了,⾃⼰写⼀个
def split_data(data,label,seq_length,ratio):
len_data = data.shape[0]
#设置随机数种⼦,保证每次⽣成的结果都是⼀样的
np.random.seed(43)
#permutation随机⽣成0-len(data)随机序列
shuffled_indices = np.random.permutation(len_data)
#test_ratio为测试集所占的百分⽐
test_set_size = int(len_data * ratio)
test_indices = shuffled_indices[:test_set_size]
train_indices = shuffled_indices[test_set_size:]
train_data = data[train_indices,:]
train_label = label[train_indices]
train_seq_length = seq_length[train_indices]
test_data = data[test_indices,:]
test_label = label[test_indices]
test_seq_length = seq_length[test_indices]
return train_data,test_data,train_label,test_label,train_seq_length,test_seq_length
x_train,x_test, y_train, y_test, z_train, z_test = split_data(x, y, seq_length=length, ratio=0.1) #random_state:避免每⼀个划分得不同
x_train, x_valid, y_train, y_valid, z_train, z_valid = split_data(x_train, y_train, seq_length=z_train, ratio=0.2)
import pickle#============================第五步:将所有需要的存为pickle⽂件备⽤==================================
print('Finished creating the data generator.')
import pickle
import os
with open(os.path.join(PATH,'process_data/msra/MSRA.pkl'), 'wb') as outp:
pickle.dump(word2id, outp)
pickle.dump(id2word, outp)
pickle.dump(tag2id, outp)
pickle.dump(id2tag, outp)
pickle.dump(x_train, outp)
pickle.dump(y_train, outp)
pickle.dump(z_train, outp)
pickle.dump(x_test, outp)
pickle.dump(y_test, outp)
pickle.dump(z_test, outp)
pickle.dump(x_valid, outp)
pickle.dump(y_valid, outp)
pickle.dump(z_valid, outp)
print('** Finished saving the data.')
中间步骤的df_data如下:
需要注意的是上⾯的训练、验证、测试数据都是从训练数据中切分的,不在字表中的字会⽤'unknow'的id进⾏映射,对于长度不够的句⼦会⽤0进⾏填充到最⼤长度。
3、定义模型
# -*- coding: utf-8 -*
import numpy as np
import tensorflow as tf
class BilstmCrfModel:
def__init__(self,config,embedding_pretrained,dropout_keep=1):
self.max_len = config.msraConfig.max_len
self.tag_size = config.msraConfig.tag_size
self.pretrained = config.msraConfig.pre_trained
self.dropout_keep = dropout_keep
self.inputX = tf.placeholder(dtype=tf.int32, shape=[None,self.max_len], name="input_data")
self.inputY = tf.placeholder(dtype=tf.int32,shape=[None,self.max_len], name="labels")
self.seq_lens = tf.placeholder(dtype=tf.int32, shape=[None])
self._build_net()
def _build_net(self):
# word_embeddings:[4027,100]
# 词嵌⼊层
with tf.name_scope("embedding"):
# 利⽤预训练的词向量初始化词嵌⼊矩阵
if self.pretrained:
embedding_w = tf.Variable(tf.bedding_pretrained, dtype=tf.float32, name="word2vec"),
name="embedding_w")
else:
embedding_w = tf.get_variable("embedding_w", shape=[bedding_size, bedding_dim],
ib.layers.xavier_initializer())
# 利⽤词嵌⼊矩阵将输⼊的数据中的词转换成词向量,维度[batch_size, sequence_length, embedding_size]
input_embedded = bedding_lookup(embedding_w, self.inputX)
input_embedded = tf.nn.dropout(input_embedded,self.dropout_keep)
with tf.name_scope("bilstm"):
lstm_fw_cell = _cell.bedding_dim, forget_bias=1.0, state_is_tuple=True)
lstm_bw_cell = _cell.bedding_dim, forget_bias=1.0, state_is_tuple=True)
(output_fw, output_bw), states = tf.nn.bidirectional_dynamic_rnn(lstm_fw_cell,
lstm_bw_cell,
input_embedded,
dtype=tf.float32,
time_major=False,
scope=None)
bilstm_out = tf.concat([output_fw, output_bw], axis=2)
# Fully connected layer.
with tf.name_scope("output"):
W = tf.get_variable(
"output_w",
shape=[2 * bedding_dim, self.tag_size],
ib.layers.xavier_initializer())
b = tf.stant(0.1, shape=[self.max_len, self.tag_size]), name="output_b")
self.bilstm_out = tf.tanh(tf.matmul(bilstm_out, W) + b)
with tf.name_scope("crf"):
# Linear-CRF.
log_likelihood, ansition_params = f.crf_log_likelihood(self.bilstm_out, self.inputY, self.seq_lens)
self.loss = tf.reduce_mean(-log_likelihood)
self.viterbi_sequence, viterbi_score = f.crf_decode(self.bilstm_out, ansition_params, self.seq_lens) 4、定义主函数
from config.globalConfig import *
from config.msraConfig import Config
from dataset.msraDataset import MsraDataset
_batch import BatchGenerator
from models.bilstm_crf import BilstmCrfModel
import tensorflow as tf
import os
import numpy as np
p import find_all_tag,get_labels,get_multi_metric,mean,get_binary_metric
labels_list = ['ns','nt','nr']
def train(config,model,save_path,trainBatchGen,valBatchGen):
globalStep = tf.Variable(0, name="globalStep", trainable=False)
save_path = os.path.join(save_path,"best_validation")
saver = tf.train.Saver()
with tf.Session() as sess:
# 定义trainOp
# 定义优化函数,传⼊学习速率参数
optimizer = tf.train.ainConfig.learning_rate)
# 计算梯度,得到梯度和变量
gradsAndVars = optimizerpute_gradients(model.loss)
# 将梯度应⽤到变量下,⽣成训练器
trainOp = optimizer.apply_gradients(gradsAndVars, global_step=globalStep)
sess.run(tf.global_variables_initializer())
best_f_beta_val = 0.0 #最佳验证集的f1值
for epoch in range(ainConfig.epoch+1):
for trainX_batch,trainY_batch,train_seqlen _ainConfig.batch_size):
feed_dict = {
model.inputX : trainX_batch, #[batch,max_len]
model.inputY : trainY_batch, #[batch,max_len]
model.seq_lens : train_seqlen, #[batch]
}
_, loss, pre = sess.run([trainOp,model.loss,model.viterbi_sequence],feed_dict)
currentStep = tf.train.global_step(sess, globalStep)
true_idx2label = [get_labels(label,idx2label,seq_len) for label,seq_len in zip(trainY_batch,train_seqlen)]
pre_idx2label = [get_labels(label,idx2label,seq_len) for label,seq_len in zip(pre,train_seqlen)]
precision,recall,f1 = get_multi_metric(true_idx2label,pre_idx2label,train_seqlen,labels_list)
if currentStep % 100 == 0:
print("[train] step:{} loss:{:.4f} precision:{:.4f} recall:{:.4f} f1:{:.4f}".format(currentStep,loss,precision,recall,f1))
if currentStep % 100 == 0:
#要计算所有验证样本的
losses = []
f_betas = []
precisions = []
recalls = []
for valX_batch,valY_batch,val_seqlen _ainConfig.batch_size):
feed_dict = {
model.inputX : valX_batch, #[batch,max_len]
model.inputY : valY_batch, #[batch,max_len]
model.seq_lens : val_seqlen, #[batch]
}
val_loss, val_pre = sess.run([model.loss,model.viterbi_sequence],feed_dict)
val_true_idx2label = [get_labels(label,idx2label,seq_len) for label,seq_len in zip(valY_batch,val_seqlen)]
val_pre_idx2label = [get_labels(label,idx2label,seq_len) for label,seq_len in zip(val_pre,val_seqlen)]
val_precision,val_recall,val_f1 = get_multi_metric(val_true_idx2label,val_pre_idx2label,val_seqlen,labels_list)
losses.append(val_loss)
f_betas.append(val_f1)
precisions.append(val_precision)
recalls.append(val_recall)
if mean(f_betas) > best_f_beta_val:
# 保存最好结果
best_f_beta_val = mean(f_betas)
last_improved = currentStep
saver.save(sess=sess, save_path=save_path)
improved_str = '*'
else:
improved_str = ''
print("[val] loss:{:.4f} precision:{:.4f} recall:{:.4f} f1:{:.4f} {}".format(
mean(losses),mean(precisions),mean(recalls),mean(f_betas),improved_str
))
def test(config,model,save_path,testBatchGen):
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
saver = tf.train.Saver()
ckpt = _checkpoint_state('checkpoint/msra/')
path = del_checkpoint_path
precisions = []
recalls = []
f1s = []
for testX_batch,testY_batch,test_seqlen _ainConfig.batch_size):
feed_dict = {
model.inputX : testX_batch, #[batch,max_len]
model.inputY : testY_batch, #[batch,max_len]
model.seq_lens : test_seqlen, #[batch]
}
test_pre = sess.run([model.viterbi_sequence],feed_dict) #这⾥有点奇怪,和train、val出来的数据相⽐多了⼀个[] test_pre = test_pre[0]
test_true_idx2label = [get_labels(label,idx2label,seq_len) for label,seq_len in zip(testY_batch,test_seqlen)]
test_pre_idx2label = [get_labels(label,idx2label,seq_len) for label,seq_len in zip(test_pre,test_seqlen)]
precision,recall,f1 = get_multi_metric(test_true_idx2label,test_pre_idx2label,test_seqlen,labels_list)
precisions.append(precision)
recalls.append(recall)
f1s.append(f1)
print("[test] precision:{:.4f} recall:{:.4f} f1:{:.4f}".format(
mean(precisions),mean(recalls),mean(f1s)))
def predict(word2idx,idx2word,idx2label):
max_len = 60
input_list = []
input_len = []
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
saver = tf.train.Saver()
ckpt = _checkpoint_state('checkpoint/msra/')
path = del_checkpoint_path
while True:
print("请输⼊⼀句话:")
line = input()
if line == 'q':
break
line_len = len(line)
input_len.append(line_len)
word_list = [word2idx[word] if word in word2idx else word2idx['unknow'] for word in line]
if line_len < max_len:
word_list =word_list + [0]*(max_len-line_len)
else:
word_list = word_list[:max_len]
input_list.append(word_list) #需要增加⼀个维度
input_list = np.array(input_list)
input_label = np.zeros((input_list.shape[0],input_list.shape[1])) #标签占位
input_len = np.array(input_len)
feed_dict = {
model.inputX : input_list, #[batch,max_len]
model.inputY : input_label, #[batch,max_len]
model.seq_lens : input_len, #[batch]
}
pred_label = sess.run([model.viterbi_sequence],feed_dict)
pred_label = pred_label[0]
# 将预测标签id还原为真实标签
pred_idx2label = [get_labels(label,idx2label,seq_len) for label,seq_len in zip(pred_label,input_len)] for line,pre,s_len in zip(input_list,pred_idx2label,input_len):
res = find_all_tag(pre,s_len)
for k in res:
for v in res[k]:
if v:
print(k,"".join([idx2word[word] for word in line[v[0]:v[0]+v[1]]]))
input_list = []
input_len = []
if__name__ == "__main__":
config = Config()
msraDataset = MsraDataset(config)
word2idx = _word2idx()
idx2word = _idx2word()
label2idx = _label2idx()
idx2label = _idx2label()
embedding_pre = _embedding()
x_train,y_train,z_train = _train_data()
x_val,y_val,z_val = _val_data()
x_test,y_test,z_test = _test_data()
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论