-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmodels.py
109 lines (90 loc) · 4.7 KB
/
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import torch
import torch.nn.functional as F
import transformers
from torch import nn
from torch.nn.utils.rnn import pad_sequence
DEFAULT_BERT_PRETRAINED_NAME_OR_PATH = 'DeepPavlov/rubert-base-cased-conversational' #"bert-base-cased"
def get_pretrained_bert(pretrained_name_or_path = None):
pretrained_name_or_path = pretrained_name_or_path or DEFAULT_BERT_PRETRAINED_NAME_OR_PATH
return transformers.AutoModel.from_pretrained(pretrained_name_or_path)
class SubwordBert(nn.Module):
def __init__(self, padding_idx, output_dim, bert_pretrained_name_or_path=None, freeze_bert=False):
super(SubwordBert, self).__init__()
self.bert_dropout = torch.nn.Dropout(0.2)
self.bert_model = get_pretrained_bert(bert_pretrained_name_or_path)
self.bertmodule_outdim = self.bert_model.config.hidden_size
if freeze_bert:
# Uncomment to freeze BERT layers
for param in self.bert_model.parameters():
param.requires_grad = False
# output module
assert output_dim > 0
# self.dropout = nn.Dropout(p=0.4)
self.dense = nn.Linear(self.bertmodule_outdim, output_dim)
# loss
# See https://pytorch.org/docs/stable/nn.html#crossentropyloss
self.criterion = nn.CrossEntropyLoss(reduction='mean', ignore_index=padding_idx)
@property
def device(self) -> torch.device:
return next(self.parameters()).device
def get_merged_encodings(self, bert_seq_encodings, seq_splits, mode='avg'):
bert_seq_encodings = bert_seq_encodings[:sum(seq_splits) + 2, :] # 2 for [CLS] and [SEP]
bert_seq_encodings = bert_seq_encodings[1:-1, :]
# a tuple of tensors
split_encoding = torch.split(bert_seq_encodings, seq_splits, dim=0)
batched_encodings = pad_sequence(split_encoding, batch_first=True, padding_value=0)
if mode == 'avg':
seq_splits = torch.tensor(seq_splits).reshape(-1, 1).to(self.device)
out = torch.div(torch.sum(batched_encodings, dim=1), seq_splits)
elif mode == "add":
out = torch.sum(batched_encodings, dim=1)
else:
raise Exception("Not Implemented")
return out
def forward(self,
batch_bert_dict: "{'input_ids':tensor, 'attention_mask':tensor, 'token_type_ids':tensor}",
batch_splits: "list[list[int]]",
aux_word_embs: "tensor" = None,
targets: "tensoaux_word_embsr" = None,
topk=1):
# cnn
batch_size = len(batch_splits)
# bert
# BS X max_nsubwords x self.bertmodule_outdim
bert_encodings = self.bert_model(**batch_bert_dict, return_dict=False)[0]
bert_encodings = self.bert_dropout(bert_encodings)
# BS X max_nwords x self.bertmodule_outdim
bert_merged_encodings = pad_sequence(
[self.get_merged_encodings(bert_seq_encodings, seq_splits, mode='avg') \
for bert_seq_encodings, seq_splits in zip(bert_encodings, batch_splits)],
batch_first=True,
padding_value=0
)
# concat aux_embs
# if not None, the expected dim for aux_word_embs: [BS,max_nwords,*]
intermediate_encodings = bert_merged_encodings
if aux_word_embs is not None:
intermediate_encodings = torch.cat((intermediate_encodings, aux_word_embs), dim=2)
# dense
# [BS,max_nwords,*] or [BS,max_nwords,self.bertmodule_outdim]->[BS,max_nwords,output_dim]
# logits = self.dense(self.dropout(intermediate_encodings))
logits = self.dense(intermediate_encodings)
# loss
if targets is not None:
assert len(targets) == batch_size # targets:[[BS,max_nwords]
logits_permuted = logits.permute(0, 2, 1) # logits: [BS,output_dim,max_nwords]
loss = self.criterion(logits_permuted, targets)
# eval preds
if not self.training:
probs = F.softmax(logits, dim=-1) # [BS,max_nwords,output_dim]
if topk > 1:
topk_values, topk_inds = \
torch.topk(probs, topk, dim=-1, largest=True,
sorted=True) # -> (Tensor, LongTensor) of [BS,max_nwords,topk]
elif topk == 1:
topk_inds = torch.argmax(probs, dim=-1) # [BS,max_nwords]
# Note that for those positions with padded_idx,
# the arg_max_prob above computes a index because
# the bias term leads to non-uniform values in those positions
return loss.cpu().detach().numpy(), topk_inds.cpu().detach().numpy()
return loss