-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathview_crawler.py
executable file
·257 lines (219 loc) · 8.89 KB
/
view_crawler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
#!/usr/bin/env python
import apsw
import config
import json
from misc import get_view_counts
import numpy as np
import numpy.random as rng
import time
VIEWS_THRESHOLD = 100
LBC_THRESHOLD = 0.0
SLEEP = 10.0
NUM_PER_API_CALL = 1000
# Database connections
conn = apsw.Connection("db/view_crawler.db")
db = conn.cursor()
db.execute("PRAGMA AUTOVACUUM = ON;")
def initialise_database():
db.execute("PRAGMA JOURNAL_MODE=WAL;")
db.execute("BEGIN;")
db.execute("""
CREATE TABLE IF NOT EXISTS streams
(claim_hash BYTES PRIMARY KEY,
name TEXT NOT NULL,
title TEXT)
WITHOUT ROWID;
""")
db.execute("""
CREATE TABLE IF NOT EXISTS stream_measurements
(id INTEGER PRIMARY KEY,
time REAL NOT NULL,
stream BYTES NOT NULL,
views INTEGER NOT NULL DEFAULT 0,
lbc REAL NOT NULL DEFAULT 0.0,
FOREIGN KEY (stream) REFERENCES streams(claim_hash));
""")
db.execute("""CREATE TABLE IF NOT EXISTS metadata
(id INTEGER PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
value INTEGER);""")
db.execute("""
INSERT INTO metadata (name, value)
VALUES ('lbry_api_calls', 0)
ON CONFLICT (name) DO NOTHING;
""")
# An index. Could probably be improved.
db.execute("""CREATE INDEX IF NOT EXISTS idx1 ON stream_measurements
(views DESC, stream);""")
db.execute("CREATE INDEX IF NOT EXISTS sv_idx ON stream_measurements (stream, views);")
db.execute("COMMIT;")
def do_api_call():
now = time.time()
# Get the range of rowids
claims_db = apsw.Connection(config.claims_db_file,
flags=apsw.SQLITE_OPEN_READONLY)
claims_db.setbusytimeout(5000)
cdb = claims_db.cursor()
cdb.execute("BEGIN;")
result = cdb.execute("SELECT MIN(rowid), MAX(rowid) FROM claim;")\
.fetchall()[0]
min_rowid, max_rowid = result
cdb.execute("COMMIT;")
# Put up to 197 claim hashes in here
measurements = dict()
while len(measurements) < NUM_PER_API_CALL:
rowid = min_rowid + rng.randint(max_rowid - min_rowid + 1)
cdb.execute("BEGIN;")
row = cdb.execute("""SELECT claim_hash,
claim_name,
(amount+support_amount)/1E8 lbc,
title
FROM claim
WHERE claim_type=1 AND rowid=?
AND lbc >= ?;""",
(rowid, LBC_THRESHOLD)).fetchone()
cdb.execute("COMMIT;")
if row is not None:
cdb.execute("BEGIN;")
nsfw = cdb.execute("""SELECT COUNT(*) FROM tag
WHERE claim_hash = ?
AND tag in
('nsfw', 'xxx', 'sex',
'porn', 'mature')""", (row[0], )
).fetchone()[0]
cdb.execute("COMMIT;")
if nsfw == 0:
measurements[row[0]] = dict(name=row[1], lbc=row[2],
title=row[3])
# Get the view counts and prepare to add to DB
claim_hashes = list(measurements.keys())
claim_ids = [ch[::-1].hex() for ch in claim_hashes]
views = get_view_counts(claim_ids)
# Rows to insert new streams and new measurements
zipped0 = []
zipped1 = []
# Row for adding title
zipped2 = []
for i in range(len(views)):
ch = claim_hashes[i]
zipped0.append((ch, measurements[ch]["name"]))
if views[i] >= VIEWS_THRESHOLD and \
measurements[ch]["lbc"] >= LBC_THRESHOLD:
ch = claim_hashes[i]
zipped1.append((now, ch, views[i], measurements[ch]["lbc"]))
# Only bother with title if a measurement is made
zipped2.append((measurements[ch]["title"], ch))
db.execute("BEGIN;")
db.executemany("""INSERT INTO streams (claim_hash, name)
VALUES (?, ?)
ON CONFLICT (claim_hash) DO NOTHING;""", zipped0)
db.executemany("""UPDATE streams SET title=? where claim_hash=?;""",
zipped2)
db.executemany("""INSERT INTO stream_measurements (time, stream, views, lbc)
VALUES (?, ?, ?, ?);""", zipped1)
db.execute("UPDATE metadata set value=value+1 WHERE name='lbry_api_calls';")
db.execute("COMMIT;")
claims_db.close()
def status():
# Preparing for when we'll want more status measurements
result = dict()
result["lbry_api_calls"] = db.execute("""SELECT value FROM metadata WHERE
name='lbry_api_calls';""")\
.fetchone()[0]
result["streams_in_db"] = db.execute("""SELECT COUNT(claim_hash)
FROM streams;""").fetchone()[0]
result["measurements_in_db"] = db.execute("""SELECT COUNT(id)
FROM stream_measurements;""")\
.fetchone()[0]
return result
def read_top(num=1000):
k = 1
result = dict()
result["ranks"] = []
result["names"] = []
result["titles"] = []
result["claim_ids"] = []
result["tv_urls"] = []
result["views"] = []
for row in db.execute("""SELECT s.claim_hash, s.name, s.title, MAX(sm.views) v
FROM streams s INNER JOIN stream_measurements sm
ON s.claim_hash = sm.stream
GROUP BY s.claim_hash
ORDER BY v DESC LIMIT ?;""", (num, )):
claim_id = row[0][::-1].hex()
result["ranks"].append(k)
result["names"].append(row[1])
title = row[2]
if title is None:
title = "Not yet scraped :-("
result["titles"].append(title)
result["claim_ids"].append(claim_id)
result["tv_urls"].append("https://odysee.com/" + result["names"][-1] + ":"\
+ claim_id)
result["views"].append(row[3])
k += 1
return result
def popular_recently(num=1000):
start = time.time() - 90.0*86400.0
k = 1
result = dict()
result["ranks"] = []
result["names"] = []
result["titles"] = []
result["claim_ids"] = []
result["tv_urls"] = []
result["view_rates"] = []
for row in db.execute("""
-- Recent measurements
SELECT stream, name, title,
(MAX(views) - MIN(views))/(MAX(time) - MIN(time) + 7200.0) view_rate FROM
(SELECT * FROM stream_measurements sm
INNER JOIN streams s ON sm.stream = s.claim_hash
WHERE time >= ?)
GROUP BY stream
ORDER BY view_rate DESC
LIMIT ?;""", (start, num)):
claim_id = row[0][::-1].hex()
result["ranks"].append(k)
result["names"].append(row[1])
title = row[2]
if title is None:
title = "Not yet scraped :-("
result["titles"].append(title)
result["claim_ids"].append(claim_id)
result["tv_urls"].append("https://odysee.com/" + result["names"][-1] + ":"\
+ claim_id)
result["view_rates"].append(row[3]*86400.0)
k += 1
return result
if __name__ == "__main__":
initialise_database()
k = 1
while True:
print(f"Checking {NUM_PER_API_CALL} streams...", end="", flush=True)
try:
do_api_call()
s = json.dumps(status(), indent=4)
print(f"done.\nStatus:\n{s}.\n\n", end="", flush=True)
except:
print("Something went wrong.\n\n", end="", flush=True)
time.sleep(SLEEP)
k += 1
if k % 100 == 0:
print("Creating JSON of top viewed (all time)...", end="", flush=True)
f = open("json/view_crawler.json", "w")
json.dump(read_top(), f, indent=2)
f.close()
print("done.\n\n", end="", flush=True)
if k % 100 == 0:
print("Creating JSON of top recent view rates...", end="", flush=True)
f = open("json/view_crawler_recent.json", "w")
json.dump(popular_recently(), f, indent=2)
f.close()
print("done.\n\n", end="", flush=True)
print("Resting for two minutes.", flush=True)
time.sleep(120)
if k % 1000 == 0:
print("Cleaning up database...", end="", flush=True)
db.execute("PRAGMA main.WAL_CHECKPOINT(TRUNCATE);")
print("done.\n\n", end="", flush=True)