-
Notifications
You must be signed in to change notification settings - Fork 126
/
Copy pathmodel.py
111 lines (94 loc) · 4.73 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
from copy import deepcopy
import torch
import torch.nn as nn
import torch.nn.functional as F
from models.reg_lstm.weight_drop import WeightDrop
from models.reg_lstm.embed_regularize import embedded_dropout
class RegLSTM(nn.Module):
def __init__(self, config):
super().__init__()
dataset = config.dataset
target_class = config.target_class
self.is_bidirectional = config.bidirectional
self.has_bottleneck_layer = config.bottleneck_layer
self.mode = config.mode
self.tar = config.tar
self.ar = config.ar
self.beta_ema = config.beta_ema # Temporal averaging
self.wdrop = config.wdrop # Weight dropping
self.embed_droprate = config.embed_droprate # Embedding dropout
if config.mode == 'rand':
rand_embed_init = torch.Tensor(config.words_num, config.words_dim).uniform_(-0.25, 0.25)
self.embed = nn.Embedding.from_pretrained(rand_embed_init, freeze=False)
elif config.mode == 'static':
self.static_embed = nn.Embedding.from_pretrained(dataset.TEXT_FIELD.vocab.vectors, freeze=True)
elif config.mode == 'non-static':
self.non_static_embed = nn.Embedding.from_pretrained(dataset.TEXT_FIELD.vocab.vectors, freeze=False)
else:
print("Unsupported Mode")
exit()
self.lstm = nn.LSTM(config.words_dim, config.hidden_dim, dropout=config.dropout, num_layers=config.num_layers,
bidirectional=self.is_bidirectional, batch_first=True)
if self.wdrop:
self.lstm = WeightDrop(self.lstm, ['weight_hh_l0'], dropout=self.wdrop)
self.dropout = nn.Dropout(config.dropout)
if self.has_bottleneck_layer:
if self.is_bidirectional:
self.fc1 = nn.Linear(2 * config.hidden_dim, config.hidden_dim) # Hidden Bottleneck Layer
self.fc2 = nn.Linear(config.hidden_dim, target_class)
else:
self.fc1 = nn.Linear(config.hidden_dim, config.hidden_dim//2) # Hidden Bottleneck Layer
self.fc2 = nn.Linear(config.hidden_dim//2, target_class)
else:
if self.is_bidirectional:
self.fc1 = nn.Linear(2 * config.hidden_dim, target_class)
else:
self.fc1 = nn.Linear(config.hidden_dim, target_class)
if self.beta_ema>0:
self.avg_param = deepcopy(list(p.data for p in self.parameters()))
if torch.cuda.is_available():
self.avg_param = [a.cuda() for a in self.avg_param]
self.steps_ema = 0.
def forward(self, x, lengths=None):
if self.mode == 'rand':
x = embedded_dropout(self.embed, x, dropout=self.embed_droprate if self.training else 0) if self.embed_droprate else self.embed(x)
elif self.mode == 'static':
x = embedded_dropout(self.static_embed, x, dropout=self.embed_droprate if self.training else 0) if self.embed_droprate else self.static_embed(x)
elif self.mode == 'non-static':
x = embedded_dropout(self.non_static_embed, x, dropout=self.embed_droprate if self.training else 0) if self.embed_droprate else self.non_static_embed(x)
else:
print("Unsupported Mode")
exit()
if lengths is not None:
x = torch.nn.utils.rnn.pack_padded_sequence(x, lengths, batch_first=True)
rnn_outs, _ = self.lstm(x)
rnn_outs_temp = rnn_outs
if lengths is not None:
rnn_outs,_ = torch.nn.utils.rnn.pad_packed_sequence(rnn_outs, batch_first=True)
rnn_outs_temp, _ = torch.nn.utils.rnn.pad_packed_sequence(rnn_outs_temp, batch_first=True)
x = F.relu(torch.transpose(rnn_outs_temp, 1, 2))
x = F.max_pool1d(x, x.size(2)).squeeze(2)
x = self.dropout(x)
if self.has_bottleneck_layer:
x = F.relu(self.fc1(x))
# x = self.dropout(x)
if self.tar or self.ar:
return self.fc2(x), rnn_outs.permute(1,0,2)
return self.fc2(x)
else:
if self.tar or self.ar:
return self.fc1(x), rnn_outs.permute(1,0,2)
return self.fc1(x)
def update_ema(self):
self.steps_ema += 1
for p, avg_p in zip(self.parameters(), self.avg_param):
avg_p.mul_(self.beta_ema).add_((1-self.beta_ema)*p.data)
def load_ema_params(self):
for p, avg_p in zip(self.parameters(), self.avg_param):
p.data.copy_(avg_p/(1-self.beta_ema**self.steps_ema))
def load_params(self, params):
for p,avg_p in zip(self.parameters(), params):
p.data.copy_(avg_p)
def get_params(self):
params = deepcopy(list(p.data for p in self.parameters()))
return params