-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdata.py
More file actions
288 lines (250 loc) · 10.8 KB
/
Copy pathdata.py
File metadata and controls
288 lines (250 loc) · 10.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
from lib2to3.pgen2 import token
from turtle import pos
import pandas as pd
import numpy as np
import torch.nn.functional as F
from torch.utils import data
import os
import torch
from torch.utils.data import DataLoader,Dataset
import matplotlib.pyplot as plt
import jieba
from transformers import BertTokenizer
from bert.make_data import cut_sent
class ChSentiDataSet(Dataset):
def __init__(self, data_path, embedding) -> None:
# 由于数据比较少, 我们直接一次加载进来就好.
# label, review
self.data_all = pd.read_csv(data_path, keep_default_na=False, header=0)
self.embedding = embedding
super().__init__()
pass
def __len__(self):
# 返回数据集大小
return self.data_all.shape[0]
def __getitem__(self, index):
# 返回下标为index的数据项
review = self.data_all.iloc[index,1]
review = self.embedding.toTensor(review)
label =int(self.data_all.iloc[index,0])
return review, label
class embedding:
# 不太了解embedding, 先使用所有训练数据中的句子拆出的词表, 把出现次数少(<100)的词语视为other
# 拆出的词表需要去掉停止词
def __init__(self, all_sentence_path,minfr=100) -> None:
# 读取所有句子
all_sentence = pd.read_csv(all_sentence_path,header=0,names=['label','sentence'],keep_default_na=False)
#读取停止词
self.stop_words = pd.read_csv("data/stop_words.csv",header=None,sep='!')
# 计算词频
# FIXME: 性能有待提升
self.word_dict = {}
for sentence in all_sentence['sentence']:
words = jieba.lcut(sentence)
for wd in words:
self.word_dict[wd]= self.word_dict[wd]+1 if self.word_dict.__contains__(wd) else 1
# 得到所有单词
self.words = list(self.word_dict.keys())
# 删除停止词和低频词
for wd in self.word_dict:
if self.word_dict[wd]<=minfr:
self.words.remove(wd)
# 添加other
self.words.append("@other")
self.words.append("@pad")
self.words.append("@cls")
self.words.append("@sep")
def __len__(self):
return len(self.words)
def toTensor(self, sentence):
# 得到一个句子的向量化结果
# 词袋
try:
st_words = jieba.lcut(sentence)
except:
# print(sentence)
st_words = [' ']
wd_dict = dict(zip(self.words, np.zeros(len(self.words))))
for wd in st_words:
# if wd_dict.__contains__(wd):
if wd in self.stop_words:
continue
if wd in self.words:
wd_dict[wd] = wd_dict[wd]+1
else:
wd_dict["@other"]+=1
ret = torch.tensor(list(wd_dict.values()),dtype=float)
ret = F.softmax(ret,dim=-1)
return ret
class oneHotEmbedding(embedding):
def __init__(self, all_sentence_path, length, minfr=100) -> None:
super().__init__(all_sentence_path, minfr)
# 得到词典
self.wordsindex = dict((_,i) for i,_ in enumerate(self.words))
self.length = length
def toTensor(self, sentence):
sent_words = jieba.lcut(sentence)
sent_index = []
# 把词语转换维字典对应的序号
for word in sent_words:
if self.wordsindex.__contains__(word):
sent_index.append(self.wordsindex[word]) # 已存在的
else:
sent_index.append(self.wordsindex['@other']) # unk
while len(sent_index) < self.length: # padding
sent_index.append(self.wordsindex['@pad'])
# 将id转换为独热码
id = torch.tensor(sent_index[:self.length]).reshape(self.length,1)
sent_tensor = torch.zeros((self.length,len(self.words)), dtype=torch.float).scatter_(dim=1, index=id, value=1)
return sent_tensor.unsqueeze(0) # 为了后面卷积方便调用接口, 因此加一层channel维
class indexDictEmbedding(oneHotEmbedding):
def __init__(self, all_sentence_path, length, minfr=100) -> None:
super().__init__(all_sentence_path, length, minfr)
def toTensor(self, sentence):
sent_words = jieba.lcut(sentence)
sent_index = []
# 把词语转换维字典对应的序号
for word in sent_words:
if self.wordsindex.__contains__(word):
sent_index.append(self.wordsindex[word]) # 已存在的
else:
sent_index.append(self.wordsindex['@other']) # unk
while len(sent_index) < self.length: # padding
sent_index.append(self.wordsindex['@pad'])
# 得到句子中每个单词的id
id = torch.tensor(sent_index[:self.length])
return id.unsqueeze(0) # 为了后面卷积方便调用接口, 因此加一层channel维
class bertEmbedding(embedding):
def __init__(self, all_sentence_path, length, minfr=100) -> None:
super().__init__(all_sentence_path, minfr)
self.length = length
# 使用的预训练模型:https://github.com/ymcui/Chinese-BERT-wwm
# self.tokenizer = BertTokenizer.from_pretrained('hfl/chinese-bert-wwm',padding=True, truncation=True, return_tensors="pt")
self.tokenizer = BertTokenizer('data/bert_min/vocab.txt')
def __len__(self):
return self.tokenizer.vocab_size
def toTensor(self, sentence):
token_words = self.tokenizer.encode_plus(sentence, max_length=self.length, padding='max_length',truncation=True)
token_words['attention_mask'] = (np.array(token_words['attention_mask'])-1)*([-1]) # if my bert
return torch.tensor([token_words['input_ids'], token_words['token_type_ids'], token_words['attention_mask']])
# return torch.tensor(token_words['input_ids'])
class bertEmbeddingv1(embedding):
def __init__(self, all_sentence_path, length, minfr=100) -> None:
super().__init__(all_sentence_path, minfr)
# 得到词典
self.wordsindex = dict((_,i) for i,_ in enumerate(self.words))
self.length = length
def toTensor(self, sentence):
sent_words = jieba.lcut(sentence)
sent_index = []
# 把词语转换维字典对应的序号
sent_index.append(self.wordsindex['@cls'])
for word in sent_words:
if self.wordsindex.__contains__(word):
sent_index.append(self.wordsindex[word]) # 已存在的
else:
sent_index.append(self.wordsindex['@other']) # unk
sent_index.append(self.wordsindex['@sep'])
while len(sent_index) < self.length: # padding
sent_index.append(self.wordsindex['@pad'])
# 得到句子中每个单词的id
input_ids = torch.tensor(sent_index[:self.length])
attention_mask = torch.tensor([i!=self.wordsindex['@pad'] for i in sent_index])
token_type_ids = torch.tensor([0]*self.length)
position_ids = torch.tensor([i for i in range(self.length)])
return torch.tensor(input_ids, attention_mask, token_type_ids, position_ids)
class HANembedding(embedding):
def __init__(self, all_sentence_path, doc_len, sent_len, minfr=100) -> None:
super().__init__(all_sentence_path, minfr)
self.doc_len = doc_len
self.sent_len = sent_len
self.wordsindex = dict((_,i) for i,_ in enumerate(self.words))
def toTensor(self, sentence):
doc = sentence
ret = []
for sent in cut_sent(doc):
sent_words = jieba.lcut(sent)
sent_index = []
# 把词语转换维字典对应的序号
for word in sent_words:
if self.wordsindex.__contains__(word):
sent_index.append(self.wordsindex[word]) # 已存在的
else:
sent_index.append(self.wordsindex['@other']) # unk
while len(sent_index) < self.sent_len: # padding
sent_index.append(self.wordsindex['@pad'])
ret.append(sent_index[:self.sent_len])
while len(ret) < self.doc_len:
ret.append([self.wordsindex['@pad']]*self.sent_len)
ret = ret[:self.doc_len]
return torch.tensor(ret) # [doc_len, sent_len]的二维数组
class corpusInfo:
def __init__(self, path) -> None:
self.data = pd.read_csv(path, keep_default_na=False, header=0, names=['label','sentence'])
def maxWords(self):
max_cnt = 0
max_sent =''
for sent in self.data['sentence']:
word_sent = jieba.lcut(sent)
if max_cnt < len(word_sent):
max_cnt = len(word_sent)
max_sent = sent
return (max_sent, max_cnt)
def minWords(self):
min_cnt = 1000000
min_sent =''
for sent in self.data['sentence']:
word_sent = jieba.lcut(sent)
if min_cnt > len(word_sent) and len(word_sent) != 0:
min_cnt = len(word_sent)
min_sent = sent
return (min_sent, min_cnt)
def meanWords(self):
mean_cnt = 0.0
for sent in self.data['sentence']:
word_sent = jieba.lcut(sent)
mean_cnt += len(word_sent)
return mean_cnt/3200.0
def histWords(self):
cnt = []
for sent in self.data['sentence']:
cnt.append(len(jieba.lcut(sent)))
cnt.sort()
plt.hist(cnt,bins=300)
plt.show()
print(cnt[round(len(cnt)*0.9)])
def HANstatics(self):
doc_all = self.data['sentence']
sent_len = []
word_len = []
sent = []
word = []
for doc in doc_all:
sents = cut_sent(doc)
sent.append(sents)
sent_len.append(len(sents))
words = []
for st in sents:
token = jieba.lcut(st)
words.append(token)
word_len.append(len(token))
word.append(words)
print(f"mean length of the doc:{np.mean(sent_len)}")
print(f"mean length of the sentence:{np.mean(word_len)}")
print(f"max length of the doc:{np.max(sent_len)}")
print(f"max length of the sentence:{np.max(word_len)}")
print(f"min length of the doc:{np.min(sent_len)}")
print(f"min length of the sentence:{np.min(word_len)}")
sent_len.sort()
word_len.sort()
plt.subplot(2, 1, 1)
plt.hist(sent_len,bins=300)
plt.subplot(2, 1, 2)
plt.hist(word_len,bins=300)
plt.show()
if __name__ == "__main__":
info = corpusInfo("data/ChnSentiCorp_htl_all/ChnSentiCorp_htl_all.csv")
info.HANstatics()
# embedding = bertEmbedding("data/ChnSentiCorp_htl_all/ChnSentiCorp_htl_all.csv", 128)
# train_data = ChSentiDataSet("data/ChnSentiCorp_htl_all/train_1600+1600.csv", embedding)
# print(train_data.__getitem__(2))