-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathBPR.py
109 lines (100 loc) · 4.17 KB
/
BPR.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# Implement BPR.
# Steffen Rendle, et al. BPR: Bayesian personalized ranking from implicit feedback.
# Proceedings of the twenty-fifth conference on uncertainty in artificial intelligence. AUAI, 2009.
# @author Runlong Yu, Mingyue Cheng, Weibo Gao
import random
from collections import defaultdict
import numpy as np
from sklearn.metrics import roc_auc_score
import scores
class BPR:
user_count = 943
item_count = 1682
latent_factors = 20
lr = 0.01
reg = 0.01
train_count = 1000
train_data_path = 'train.txt'
test_data_path = 'test.txt'
size_u_i = user_count * item_count
# latent_factors of U & V
U = np.random.rand(user_count, latent_factors) * 0.01
V = np.random.rand(item_count, latent_factors) * 0.01
biasV = np.random.rand(item_count) * 0.01
test_data = np.zeros((user_count, item_count))
test = np.zeros(size_u_i)
predict_ = np.zeros(size_u_i)
def load_data(self, path):
user_ratings = defaultdict(set)
with open(path, 'r') as f:
for line in f.readlines():
u, i = line.split(" ")
u = int(u)
i = int(i)
user_ratings[u].add(i)
return user_ratings
def load_test_data(self, path):
file = open(path, 'r')
for line in file:
line = line.split(' ')
user = int(line[0])
item = int(line[1])
self.test_data[user - 1][item - 1] = 1
def train(self, user_ratings_train):
for user in range(self.user_count):
# sample a user
u = random.randint(1, self.user_count)
if u not in user_ratings_train.keys():
continue
# sample a positive item from the observed items
i = random.sample(user_ratings_train[u], 1)[0]
# sample a negative item from the unobserved items
j = random.randint(1, self.item_count)
while j in user_ratings_train[u]:
j = random.randint(1, self.item_count)
u -= 1
i -= 1
j -= 1
r_ui = np.dot(self.U[u], self.V[i].T) + self.biasV[i]
r_uj = np.dot(self.U[u], self.V[j].T) + self.biasV[j]
r_uij = r_ui - r_uj
loss_func = -1.0 / (1 + np.exp(r_uij))
# update U and V
self.U[u] += -self.lr * (loss_func * (self.V[i] - self.V[j]) + self.reg * self.U[u])
self.V[i] += -self.lr * (loss_func * self.U[u] + self.reg * self.V[i])
self.V[j] += -self.lr * (loss_func * (-self.U[u]) + self.reg * self.V[j])
# update biasV
self.biasV[i] += -self.lr * (loss_func + self.reg * self.biasV[i])
self.biasV[j] += -self.lr * (-loss_func + self.reg * self.biasV[j])
def predict(self, user, item):
predict = np.mat(user) * np.mat(item.T)
return predict
def main(self):
user_ratings_train = self.load_data(self.train_data_path)
self.load_test_data(self.test_data_path)
for u in range(self.user_count):
for item in range(self.item_count):
if int(self.test_data[u][item]) == 1:
self.test[u * self.item_count + item] = 1
else:
self.test[u * self.item_count + item] = 0
# training
for i in range(self.train_count):
self.train(user_ratings_train)
predict_matrix = self.predict(self.U, self.V)
# prediction
self.predict_ = predict_matrix.getA().reshape(-1)
self.predict_ = pre_handel(user_ratings_train, self.predict_, self.item_count)
auc_score = roc_auc_score(self.test, self.predict_)
print('AUC:', auc_score)
# Top-K evaluation
scores.topK_scores(self.test, self.predict_, 5, self.user_count, self.item_count)
def pre_handel(set, predict, item_count):
# Ensure the recommendation cannot be positive items in the training set.
for u in set.keys():
for j in set[u]:
predict[(u - 1) * item_count + j - 1] = 0
return predict
if __name__ == '__main__':
bpr = BPR()
bpr.main()