This repository has been archived by the owner on Mar 18, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcheckbookmarks.py
117 lines (98 loc) · 3.98 KB
/
checkbookmarks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# Copyright (C) 2017 İ. Göktuğ Kayaalp <self at gkayaalp dot com>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"Check the health of Firefox bookmarks."
import argparse
import itertools as itert
import multiprocessing as mp
import socket
import sqlite3
import sys
import urllib.request as rq
from urllib.error import URLError
query = "select url from moz_places inner join moz_bookmarks on moz_places.id = moz_bookmarks.fk"
useragent = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36'
report_success = False
report_redirects = False
timeout = 10
database = "places.sqlite"
njobs = 10
nresults = None
connection = sqlite3.connect(database)
cursor = connection.cursor()
results = map(lambda x: x[0],
filter(lambda x: x[0].startswith("http"),
cursor.execute(query)))
success = mp.Queue()
failure = mp.Queue()
def check1(url, timeout=10):
req = rq.Request(url, headers={'User-Agent': useragent})
request = None
try: request = rq.urlopen(req, timeout=timeout)
except socket.timeout as e:
raise e
finally:
if request: request.close()
rurl = request.url
if rurl == url: return (url, request.status)
else: return ([url, rurl], request.status)
def check(url):
try:
url, status = check1(url)
if report_redirects or report_success:
if type(url2) == list:
url = " => ".join(url)
print("Success '{}': {}".format(url, status))
success.put((url, status))
except Exception as error:
errstr = str(error)
print("Error '{}': {}".format(url, errstr))
failure.put((url, errstr))
def run(nprocs, nresults):
with mp.Pool(processes=nprocs) as pool:
act = pool.map
if nresults: act(check, itert.islice(results, nresults))
else: act(check, results)
nsuccess = success.qsize()
nfailure = failure.qsize()
total = nsuccess + nfailure
print("Checked {} urls: {} healty, {} dead (%{} linkrot)".format(
total, nsuccess, nfailure, nfailure / total * 100))
def cli(args):
global report_success, report_redirects, timeout, database
global njobs, nresults
p = argparse.ArgumentParser(
description="Check the health of Firefox bookmarks.")
p.add_argument("db", metavar="DATABASE", type=str,
help="the path to places.sqlite file (preferably a copy,"
" not the actual one Firefox uses)")
p.add_argument("-a", type=str, metavar="USER-AGENT",
help="the user agent string to usewhen making requests,"
" by default set to resemble a web browser")
p.add_argument("-t", metavar="SECONDS", type=int,
help="connection timeout, default: %d seconds" % timeout)
p.add_argument("-j", metavar="JOBS", type=int,
help="how many concurrent jobs to run, default: %d jobs" % njobs)
p.add_argument("-r", metavar="RESULTS", type=int,
help="check at most this much urls, by default all of them are"
" processed")
p.add_argument("-v", action="count", help="be verbose")
args = p.parse_args(args)
if args.v:
if args.v >= 1: report_success = True
if args.v >= 2: report_redirects = True
if args.t: timeout = args.t
if args.j: njobs = args.j
if args.r: nresults = args.r
database = args.db
if __name__ == "__main__":
cli(sys.argv[1:])
run(njobs, nresults)