-
Notifications
You must be signed in to change notification settings - Fork 52
/
Copy pathmodel.py
148 lines (122 loc) · 6.38 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import torch
import torch.nn as nn
from torch.nn import init
import torch.nn.functional as F
import numpy as np
from pytorch_pretrained_bert import BertModel
from data_load import idx2trigger, argument2idx
from consts import NONE
from utils import find_triggers
class Net(nn.Module):
def __init__(self, trigger_size=None, entity_size=None, all_postags=None, postag_embedding_dim=50, argument_size=None, entity_embedding_dim=50, device=torch.device("cpu")):
super().__init__()
self.bert = BertModel.from_pretrained('bert-base-cased')
self.entity_embed = MultiLabelEmbeddingLayer(num_embeddings=entity_size, embedding_dim=entity_embedding_dim, device=device)
self.postag_embed = nn.Embedding(num_embeddings=all_postags, embedding_dim=postag_embedding_dim)
self.rnn = nn.LSTM(bidirectional=True, num_layers=1, input_size=768 + entity_embedding_dim, hidden_size=768 // 2, batch_first=True)
# hidden_size = 768 + entity_embedding_dim + postag_embedding_dim
hidden_size = 768
self.fc1 = nn.Sequential(
# nn.Dropout(0.5),
nn.Linear(hidden_size, hidden_size, bias=True),
nn.ReLU(),
)
self.fc_trigger = nn.Sequential(
nn.Linear(hidden_size, trigger_size),
)
self.fc_argument = nn.Sequential(
nn.Linear(hidden_size * 2, argument_size),
)
self.device = device
def predict_triggers(self, tokens_x_2d, entities_x_3d, postags_x_2d, head_indexes_2d, triggers_y_2d, arguments_2d):
tokens_x_2d = torch.LongTensor(tokens_x_2d).to(self.device)
# postags_x_2d = torch.LongTensor(postags_x_2d).to(self.device)
triggers_y_2d = torch.LongTensor(triggers_y_2d).to(self.device)
head_indexes_2d = torch.LongTensor(head_indexes_2d).to(self.device)
# postags_x_2d = self.postag_embed(postags_x_2d)
# entity_x_2d = self.entity_embed(entities_x_3d)
if self.training:
self.bert.train()
encoded_layers, _ = self.bert(tokens_x_2d)
enc = encoded_layers[-1]
else:
self.bert.eval()
with torch.no_grad():
encoded_layers, _ = self.bert(tokens_x_2d)
enc = encoded_layers[-1]
# x = torch.cat([enc, entity_x_2d, postags_x_2d], 2)
# x = self.fc1(enc) # x: [batch_size, seq_len, hidden_size]
x = enc
# logits = self.fc2(x + enc)
batch_size = tokens_x_2d.shape[0]
for i in range(batch_size):
x[i] = torch.index_select(x[i], 0, head_indexes_2d[i])
trigger_logits = self.fc_trigger(x)
trigger_hat_2d = trigger_logits.argmax(-1)
argument_hidden, argument_keys = [], []
for i in range(batch_size):
candidates = arguments_2d[i]['candidates']
golden_entity_tensors = {}
for j in range(len(candidates)):
e_start, e_end, e_type_str = candidates[j]
golden_entity_tensors[candidates[j]] = x[i, e_start:e_end, ].mean(dim=0)
predicted_triggers = find_triggers([idx2trigger[trigger] for trigger in trigger_hat_2d[i].tolist()])
for predicted_trigger in predicted_triggers:
t_start, t_end, t_type_str = predicted_trigger
event_tensor = x[i, t_start:t_end, ].mean(dim=0)
for j in range(len(candidates)):
e_start, e_end, e_type_str = candidates[j]
entity_tensor = golden_entity_tensors[candidates[j]]
argument_hidden.append(torch.cat([event_tensor, entity_tensor]))
argument_keys.append((i, t_start, t_end, t_type_str, e_start, e_end, e_type_str))
return trigger_logits, triggers_y_2d, trigger_hat_2d, argument_hidden, argument_keys
def predict_arguments(self, argument_hidden, argument_keys, arguments_2d):
argument_hidden = torch.stack(argument_hidden)
argument_logits = self.fc_argument(argument_hidden)
argument_hat_1d = argument_logits.argmax(-1)
arguments_y_1d = []
for i, t_start, t_end, t_type_str, e_start, e_end, e_type_str in argument_keys:
a_label = argument2idx[NONE]
if (t_start, t_end, t_type_str) in arguments_2d[i]['events']:
for (a_start, a_end, a_type_idx) in arguments_2d[i]['events'][(t_start, t_end, t_type_str)]:
if e_start == a_start and e_end == a_end:
a_label = a_type_idx
break
arguments_y_1d.append(a_label)
arguments_y_1d = torch.LongTensor(arguments_y_1d).to(self.device)
batch_size = len(arguments_2d)
argument_hat_2d = [{'events': {}} for _ in range(batch_size)]
for (i, st, ed, event_type_str, e_st, e_ed, entity_type), a_label in zip(argument_keys, argument_hat_1d.cpu().numpy()):
if a_label == argument2idx[NONE]:
continue
if (st, ed, event_type_str) not in argument_hat_2d[i]['events']:
argument_hat_2d[i]['events'][(st, ed, event_type_str)] = []
argument_hat_2d[i]['events'][(st, ed, event_type_str)].append((e_st, e_ed, a_label))
return argument_logits, arguments_y_1d, argument_hat_1d, argument_hat_2d
# Reused from https://github.com/lx865712528/EMNLP2018-JMEE
class MultiLabelEmbeddingLayer(nn.Module):
def __init__(self,
num_embeddings=None, embedding_dim=None,
dropout=0.5, padding_idx=0,
max_norm=None, norm_type=2,
device=torch.device("cpu")):
super(MultiLabelEmbeddingLayer, self).__init__()
self.matrix = nn.Embedding(num_embeddings=num_embeddings,
embedding_dim=embedding_dim,
padding_idx=padding_idx,
max_norm=max_norm,
norm_type=norm_type)
self.dropout = dropout
self.device = device
self.to(device)
def forward(self, x):
batch_size = len(x)
seq_len = len(x[0])
x = [self.matrix(torch.LongTensor(x[i][j]).to(self.device)).sum(0)
for i in range(batch_size)
for j in range(seq_len)]
x = torch.stack(x).view(batch_size, seq_len, -1)
if self.dropout is not None:
return F.dropout(x, p=self.dropout, training=self.training)
else:
return x