使用Transformers微调基于BERT模型做中文命名实体识别任务
·
注意版本!!
python == 3.8.6
torch == 1.10.0
transformers == 4.36.2
datasets == 2.15.0
import json
# 数据集下载地址:https://www.cluebenchmarks.com/introduce.html
# 细粒度命名实体识别->下载
# 将数据转为 BIO 标注形式
def dimension_label(path, save_path, labels_path=None):
label_dict = ['O']
with open(save_path, "a", encoding="utf-8") as w:
with open(path, "r", encoding="utf-8") as r:
for line in r:
line = json.loads(line)
text = line['text']
label = line['label']
text_label = ['O'] * len(text)
for label_key in label: # 遍历实体标签
B_label = "B-" + label_key
I_label = "I-" + label_key
if B_label not in label_dict:
label_dict.append(B_label)
if I_label not in label_dict:
label_dict.append(I_label)
label_item = label[label_key]
for entity in label_item: # 遍历实体
position = label_item[entity]
start = position[0][0]
end = position[0][1]
text_label[start] = B_label
for i in range(start + 1, end + 1):
text_label[i] = I_label
line = {
"text": text,
"label": text_label
}
line = json.dumps(line, ensure_ascii=False)
w.write(line + "\n")
w.flush()
if labels_path: # 保存 label ,后续训练和预测时使用
label_map = {}
for i,label in enumerate(label_dict):
label_map[label] = i
with open(labels_path, "w", encoding="utf-8") as w:
labels = json.dumps(label_map, ensure_ascii=False)
w.write(labels + "\n")
w.flush()
if __name__ == '__main__':
path = "./cluener_public/dev.json"
save_path = "./data/dev.json"
dimension_label(path, save_path)
path = "./cluener_public/train.json"
save_path = "./data/train.json"
labels_path = "./data/labels.json"
dimension_label(path, save_path, labels_path)
# 处理数据集构建 Dataset
from torch.utils.data import Dataset, DataLoader
import torch
import json
class NERDataset(Dataset):
def __init__(self, tokenizer, file_path, labels_map, max_length=300):
self.tokenizer = tokenizer
self.max_length = max_length
self.labels_map = labels_map
self.text_data = []
self.label_data = []
with open(file_path, "r", encoding="utf-8") as r:
for line in r:
line = json.loads(line)
text = line['text']
label = line['label']
self.text_data.append(text)
self.label_data.append(label)
def __len__(self):
return len(self.text_data)
def __getitem__(self, idx):
text = self.text_data[idx]
labels = self.label_data[idx]
# 使用分词器对句子进行处理
inputs = self.tokenizer.encode_plus(
text,
None,
add_special_tokens=True,
padding='max_length',
truncation=True,
max_length=self.max_length,
return_tensors='pt'
)
input_ids = inputs['input_ids'].squeeze()
attention_mask = inputs['attention_mask'].squeeze()
# 将标签转换为数字编码
label_ids = [self.labels_map[l] for l in labels]
if len(label_ids) > self.max_length:
label_ids = label_ids[0:self.max_length]
if len(label_ids) < self.max_length:
# 标签填充到最大长度
label_ids.extend([0] * (self.max_length - len(label_ids)))
return {
'input_ids': input_ids,
'attention_mask': attention_mask,
'labels': torch.LongTensor(label_ids)
}
# 模型迭代训练
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModelForTokenClassification
# from ner_datasets import NERDataset
from tqdm import tqdm
import json
import time, sys
import numpy as np
from sklearn.metrics import f1_score
def train(epoch, model, device, loader, optimizer, gradient_accumulation_steps):
model.train()
time1 = time.time()
for index, data in enumerate(tqdm(loader, file=sys.stdout, desc="Train Epoch: " + str(epoch))):
input_ids = data['input_ids'].to(device)
attention_mask = data['attention_mask'].to(device)
labels = data['labels'].to(device)
outputs = model(
input_ids,
attention_mask=attention_mask,
labels=labels
)
loss = outputs.loss
# 反向传播,计算当前梯度
loss.backward()
# 梯度累积步数
if (index % gradient_accumulation_steps == 0 and index != 0) or index == len(loader) - 1:
# 更新网络参数
optimizer.step()
# 清空过往梯度
optimizer.zero_grad()
# 100轮打印一次 loss
if index % 100 == 0 or index == len(loader) - 1:
time2 = time.time()
tqdm.write(
f"{index}, epoch: {epoch} -loss: {str(loss)} ; each step's time spent: {(str(float(time2 - time1) / float(index + 0.0001)))}")
def validate(model, device, loader):
model.eval()
acc = 0
f1 = 0
with torch.no_grad():
for _, data in enumerate(tqdm(loader, file=sys.stdout, desc="Validation Data")):
input_ids = data['input_ids'].to(device)
attention_mask = data['attention_mask'].to(device)
labels = data['labels']
outputs = model(input_ids, attention_mask=attention_mask)
_, predicted_labels = torch.max(outputs.logits, dim=2)
predicted_labels = predicted_labels.detach().cpu().numpy().tolist()
true_labels = labels.detach().cpu().numpy().tolist()
predicted_labels_flat = [label for sublist in predicted_labels for label in sublist]
true_labels_flat = [label for sublist in true_labels for label in sublist]
accuracy = (np.array(predicted_labels_flat) == np.array(true_labels_flat)).mean()
acc = acc + accuracy
f1score = f1_score(true_labels_flat, predicted_labels_flat, average='macro')
f1 = f1 + f1score
return acc / len(loader), f1 / len(loader)
def main():
labels_path = "./data/labels.json"
model_name = 'D:\\AIGC\\model\\chinese-roberta-wwm-ext'
train_json_path = "./data/train.json"
val_json_path = "./data/dev.json"
max_length = 300
epochs = 5
batch_size = 1
lr = 1e-4
gradient_accumulation_steps = 16
model_output_dir = "output"
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 加载label
with open(labels_path, "r", encoding="utf-8") as r:
labels_map = json.loads(r.read())
# 加载分词器和模型
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForTokenClassification.from_pretrained(model_name, num_labels=len(labels_map))
model.to(device)
# 加载数据
print("Start Load Train Data...")
train_dataset = NERDataset(tokenizer, train_json_path, labels_map, max_length)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
print("Start Load Validation Data...")
val_dataset = NERDataset(tokenizer, val_json_path, labels_map, max_length)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
# 定义优化器和损失函数
optimizer = torch.optim.AdamW(model.parameters(), lr=lr)
print("Start Training...")
best_acc = 0.0
for epoch in range(epochs):
train(epoch, model, device, train_loader, optimizer, gradient_accumulation_steps)
print("Start Validation...")
acc, f1 = validate(model, device, val_loader)
print(f"Validation : acc: {acc} , f1: {f1}")
if best_acc < acc: # 保存准确率最高的模型
print("Save Model To ", model_output_dir)
model.save_pretrained(model_output_dir)
tokenizer.save_pretrained(model_output_dir)
best_acc = acc
if __name__ == '__main__':
main()
# 模型测试
from transformers import AutoTokenizer, AutoModelForTokenClassification
import torch
import json
# 解析实体
def post_processing(outputs, text, labels_map):
_, predicted_labels = torch.max(outputs.logits, dim=2)
predicted_labels = predicted_labels.detach().cpu().numpy()
predicted_tags = [labels_map[label_id] for label_id in predicted_labels[0]]
result = {}
entity = ""
type = ""
for index, word_token in enumerate(text):
tag = predicted_tags[index]
if tag.startswith("B-"):
type = tag.split("-")[1]
if entity:
if type not in result:
result[type] = []
result[type].append(entity)
entity = word_token
elif tag.startswith("I-"):
type = tag.split("-")[1]
if entity:
entity += word_token
else:
if entity:
if type not in result:
result[type] = []
result[type].append(entity)
entity = ""
return result
def main():
labels_path = "./data/labels.json"
model_name = './output'
max_length = 300
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 加载label
labels_map = {}
with open(labels_path, "r", encoding="utf-8") as r:
labels = json.loads(r.read())
for label in labels:
label_id = labels[label]
labels_map[label_id] = label
# 加载分词器和模型
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForTokenClassification.from_pretrained(model_name, num_labels=len(labels_map))
model.to(device)
while True:
text = input("请输入:")
if not text or text == '':
continue
if text == 'q':
break
encoded_input = tokenizer(text, padding="max_length", truncation=True, max_length=max_length)
input_ids = torch.tensor([encoded_input['input_ids']]).to(device)
attention_mask = torch.tensor([encoded_input['attention_mask']]).to(device)
outputs = model(input_ids, attention_mask=attention_mask)
result = post_processing(outputs, text, labels_map)
print(result)
if __name__ == '__main__':
main()
更多推荐
所有评论(0)