-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwebcheck.py
executable file
·857 lines (822 loc) · 46.2 KB
/
webcheck.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
#!/usr/bin/env python
# (compatible with both Python 2 and Python 3)
"""webcheck.py v1.604 (c) 2014-25 Silas S. Brown.
License: Apache 2""" # (see below)
# See webcheck.html for description and usage instructions
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# CHANGES
# -------
# If you want to compare this code to old versions, most old
# versions are being kept on SourceForge's E-GuideDog SVN repository
# http://sourceforge.net/p/e-guidedog/code/HEAD/tree/ssb22/setup/
# use: svn co http://svn.code.sf.net/p/e-guidedog/code/ssb22/setup
# and on GitHub at https://github.com/ssb22/web-imap-etc
# and on GitLab at https://gitlab.com/ssb22/web-imap-etc
# and on Bitbucket https://bitbucket.org/ssb22/web-imap-etc
# and at https://gitlab.developers.cam.ac.uk/ssb22/web-imap-etc
# and in China: https://gitee.com/ssb22/web-imap-etc
max_threads = 10
keep_etags = False # if True, will also keep any ETag headers as well as Last-Modified
verify_SSL_certificates = False # webcheck's non-Webdriver URLs are for monitoring public services and there's not a lot of point in SSL authentication; failures due to server/client certificate misconfigurations are more trouble than they're worth
import traceback, time, pickle, gzip, re, os, sys, socket, hashlib
try: import htmlentitydefs # Python 2
except ImportError: import html.entities as htmlentitydefs # Python 3
try: from HTMLParser import HTMLParser # Python 2
except ImportError: # Python 3
from html.parser import HTMLParser as _HTMLParser
class HTMLParser(_HTMLParser):
def __init__(self): _HTMLParser.__init__(self,convert_charrefs=False)
try: from commands import getoutput
except: from subprocess import getoutput
try: import urlparse # Python 2
except ImportError: import urllib.parse as urlparse # Python 3
try: from StringIO import StringIO # Python 2
except: from io import BytesIO as StringIO # Python 3
try: import Queue # Python 2
except: import queue as Queue # Python 3
try: unichr # Python 2
except: unichr,xrange = chr,range # Python 3
try: from urllib2 import quote,HTTPCookieProcessor,HTTPErrorProcessor,build_opener,HTTPSHandler,urlopen,Request,HTTPError,URLError # Python 2
except: # Python 3
from urllib.parse import quote
from urllib.request import HTTPCookieProcessor,build_opener,HTTPSHandler,urlopen,Request,HTTPErrorProcessor
from urllib.error import HTTPError,URLError
def B(s): # byte-string from "" literal
if type(s)==type("")==type(u""): return s.encode('utf-8') # Python 3
else: return s # Python 2
def S(b):
if type(b)==type(""): return b # Python 2
else: return b.decode('utf-8') # Python 3
def U(s):
if type(s)==type(u""): return s
return s.decode('utf-8')
def UL(s):
if type(s)==type(u""): return s
return s.decode('latin1')
def writeBuf(f,w):
if hasattr(f,"buffer"): # Python 3
f.flush() # ensure ordering
f.buffer.write(w)
f.buffer.flush() # ensure ordering
else: f.write(w) # Python 2
try: import ssl
except: # you won't be able to check https:// URLs
ssl = 0 ; verify_SSL_certificates = False
if '--single-thread' in sys.argv: max_threads = 1 # use --single-thread if something gets stuck and you need Ctrl-C to generate a meaningful traceback
if max_threads > 1:
try: import thread # Python 2
except ImportError: import _thread as thread # Python 3
default_filename = "webcheck" + os.extsep + "list"
def read_input_file(fname=default_filename):
if os.path.isdir(fname): # support webcheck.list etc as a directory
ret = [] ; files = os.listdir(fname)
if default_filename in files: # do this one first
ret += read_input_file(fname+os.sep+default_filename)
files.remove(default_filename)
for f in files:
if f.endswith("~") or f.lower().endswith(".bak"): continue # ignore
ret += [(l+" # from "+f) for l in read_input_file(fname+os.sep+f)]
return ret
try: o = open(fname)
except: return [] # not a file or resolvable link to one, e.g. lockfile in a webcheck.list dir
lines = o.read().replace("\r","\n").split("\n")
lines.reverse() # so can pop() them in order
return lines
def read_input():
ret = {} # domain -> { url -> checklist [(days,text,elseLogic)] }
# elseLogic = None or (url,checklist)
days = 0 ; extraHeaders = []
url = mainDomain = None
lines = read_input_file()
if not lines: print ("No input found")
lastList = None # refs a list within ret, of [(days,text,elseLogic)]
while lines:
line = line_withComment = " ".join(lines.pop().split())
if " #" in line: line = line[:line.index(" #")].strip()
if not line or line_withComment[0]=='#': continue
if line.startswith(":include"):
lines += [(l+" # from "+line) for l in read_input_file(line.split(None,1)[1])]
continue
if line.endswith(':'): freqCmd = line[:-1]
else: freqCmd = line
if freqCmd.lower()=="daily": days = 1
elif freqCmd.lower()=="weekly": days = 7
elif freqCmd.lower()=="monthly": days = 30
elif freqCmd.startswith("days"): days=int(freqCmd.split()[1])
else: freqCmd = None
if freqCmd: continue
if line.startswith("PYTHONPATH="):
sys.path = line.split("=",1)[1].replace("$PYTHONPATH:","").replace(":$PYTHONPATH","").split(":") + sys.path # for importing selenium etc, if it's not installed system-wide
continue
if line.startswith("PATH="):
os.environ["PATH"] = ":".join(line.split("=",1)[1].replace("$PATH:","").replace(":$PATH","").split(":") + os.environ.get("PATH","").split(":"))
continue
isElse = False
if line.startswith("else:"):
isElse = True
line=line[5:].lstrip()
line_withComment=line_withComment[5:].lstrip()
assert line, "else: must be followed on same line"
if line.startswith('also:') and url:
text = line_withComment[5:].strip()
# and leave url and mainDomain as-is (same as above line), TODO: interaction of 'also:' (and extra headers lines) with 'else:' might not be what users expect
elif ':' in line.split()[0] and not line.split(':',1)[1].startswith('//'):
header, value = line.split(':',1) ; value=value.strip()
if not value or header.lower()=='user-agent': # no value = delete header; user-agent can be set only once so auto-delete any previous setting
for e in extraHeaders:
if e.startswith(header+':'): extraHeaders.remove(e)
if value: extraHeaders.append(line)
continue
elif line.startswith("c://") and ' ; ' in line_withComment: # shell command (and if a comment character comes before the " ; " we can cope with that)
url, text = line_withComment.split(' ; ',1)
elif line.startswith("c://"): # command that gives RSS
url, text = line, line_withComment[len(line):].lstrip() # RSS only, possibly with comment
elif line.startswith('{') and '}' in line_withComment: # webdriver
actions = line_withComment[1:line_withComment.index('}')].split()
balanceBrackets(actions)
text = line_withComment[line_withComment.index('}')+1:].strip()
mainDomain = '.'.join(urlparse.urlparse(actions[0]).netloc.rsplit('.',2)[-2:]) # assumes 1st action is a URL
url = "wd://"+chr(0).join(actions)
if extraHeaders: url += '\n'+'\n'.join(extraHeaders)
else: # not webdriver
lSplit = line_withComment.split(None,1)
if len(lSplit)==1: url, text = lSplit[0],"" # RSS only
else: url, text = lSplit
assert "://" in url
mainDomain = '.'.join(urlparse.urlparse(url).netloc.rsplit('.',2)[-2:])
if extraHeaders: url += '\n'+'\n'.join(extraHeaders)
if isElse:
assert lastList, "else without suitable rule before it"
lastList[-1] = lastList[-1][:2] + ((url,[(0,text,None)]),) # must be days=0 because don't want to re-check the days count when just retrieved and failed something possibly on same URL ('else:' can be used for simple retrying)
lastList = lastList[-1][2][1] # ref the above list (the one after url), so 'else' can be used as 'else if' and next iteration extends it
else:
lastList = ret.setdefault({
# domains to treat as equivalent for rate reduce
"superuser.com":"stackoverflow.com",
"stackexchange.com":"stackoverflow.com",
"askubuntu.com":"stackoverflow.com",
}.get(mainDomain,mainDomain),{}).setdefault(url,[])
lastList.append((days,text,None))
return ret
def balanceBrackets(wordList):
"For webdriver instructions: merge adjacent items of wordList so each item has balanced square brackets (currently checks only start and end of each word; if revising this, be careful about use on URLs). Also checks quotes (TODO: make sure that doesn't interfere with brackets)."
bracketLevel = 0 ; i = 0
while i < len(wordList)-1:
blOld = bracketLevel
if wordList[i][0] in '["': bracketLevel += 1
elif not bracketLevel and (('->"' in wordList[i] and not wordList[i].endswith('->"')) or '="' in wordList[i]): bracketLevel += 1
if wordList[i][-1] in ']"': bracketLevel -= 1
if bracketLevel > 0:
wordList [i] += " "+wordList[i+1]
del wordList[i+1] ; bracketLevel = blOld
else:
i += 1 ; bracketLevel = 0
class HTMLStrings(HTMLParser):
def __init__(self):
HTMLParser.__init__(self)
self.theTxt = []
self.omit = False
def handle_data(self, data):
if self.omit or not data: return
elif not data.strip(): self.ensure(' ')
else:
d2 = data.lstrip()
if not d2==data: self.ensure(' ') # (always collapse multiple spaces, even across tags)
if d2: self.theTxt.append(re.sub('[ \t\r\n]+',' ',d2.replace(unichr(160).encode('utf-8').decode('latin1'),' ')))
def ensure(self,thing):
if self.theTxt and self.theTxt[-1].endswith(thing): return
self.theTxt.append(thing)
def handle_starttag(self, tag, attrs):
if tag in "p br div h1 h2 h3 h4 h5 h6 th tr td table dt dd".split(): self.ensure(' ') # space rather than newline because we might want to watch for a string that goes across headings etc
elif tag in ["script","style"]: self.omit=True
def handle_endtag(self, tag):
if tag in ["script","style"]: self.omit=False
def handle_startendtag(self, tag, attrs):
self.handle_starttag(tag,attrs)
self.handle_endtag(tag)
def unescape(self,attr): return attr # as we don't use attrs above, no point trying to unescape them and possibly falling over if something's malformed
def handle_charref(self,ref):
if ref.startswith('x'): self.handle_data(unichr(int(ref[1:],16)).encode('utf-8').decode('latin1'))
else: self.handle_data(unichr(int(ref)).encode('utf-8').decode('latin1'))
def handle_entityref(self, ref):
if ref in htmlentitydefs.name2codepoint:
self.handle_data(unichr(htmlentitydefs.name2codepoint[ref]).encode('utf-8').decode('latin1'))
else: self.handle_data(('&'+ref+';'))
def text(self): return u''.join(self.theTxt).strip()
def htmlStrings(html):
parser = HTMLStrings()
try:
parser.feed(UL(html)) ; parser.close()
if type(html)==type(u""): return parser.text(), ""
else: return parser.text().encode("latin1"), ""
except: return html, "\n- problem extracting strings from HTML at line %d offset %d\n%s" % (parser.getpos()+(traceback.format_exc(),)) # returning html might still work for 'was that text still there' queries; error message is displayed only if it doesn't
def main():
# 1 job per domain:
global jobs ; jobs = Queue.Queue()
for mainDomain,jobItems in read_input().items():
jobs.put((mainDomain,jobItems))
global previous_timestamps
try: previous_timestamps = pickle.Unpickler(open(".webcheck-last","rb")).load()
except: previous_timestamps = {}
old_previous_timestamps = previous_timestamps.copy()
for i in xrange(1,max_threads):
if jobs.empty(): break # enough are going already
thread.start_new_thread(worker_thread,())
worker_thread() ; jobs.join()
if previous_timestamps == old_previous_timestamps: return # no point saving if no changes
try: pickle.Pickler(open(".webcheck-last","wb")).dump(previous_timestamps)
except: sys.stdout.write("Problem writing .webcheck-last (progress was NOT saved):\n"+traceback.format_exc()+"\n")
def default_opener():
if sys.version_info >= (2,7,9) and not verify_SSL_certificates: opener = build_opener(HTTPCookieProcessor(),HTTPSHandler(context=ssl._create_unverified_context())) # HTTPCookieProcessor needed for some redirects
else: opener = build_opener(HTTPCookieProcessor())
opener.addheaders = [('User-agent', default_ua),
('Accept-Encoding', 'gzip')]
return opener
default_ua = 'Mozilla/5.0 or whatever you like (actually Webcheck)'
# you can override this on a per-site basis with "User-Agent: whatever"
# and undo again with "User-Agent:" on a line by itself.
# Please override sparingly or with webmaster permission.
# Let's not even mention it in the readme: we don't want to encourage
# people to hide their tools from webmasters unnecessarily.
class Delayer:
def __init__(self,mainDomain):
self.last_fetch_finished = 0
if mainDomain=="stackoverflow.com":
# (or other Stack Exchange sites as normalised to stackoverflow.com above) (although in 2024-12 I was able to confirm that Stack Exchange's own email notifications of others' edits to your answers _did_ work, so it might not be necessary to monitor for vandalism all your Stack Exchange answers from webcheck)
self.delay = 80 # seconds between fetches to these sites
else: self.delay = 3 # (want small if checking many pages and the server doesn't mind, but still non-0)
def wait(self):
time.sleep(max(0,self.last_fetch_finished+self.delay-time.time()))
if sys.stderr.isatty(): sys.stderr.write('.'),sys.stderr.flush()
def done(self): self.last_fetch_finished = time.time()
def worker_thread(*args):
opener = [None]
while True:
try: mainDomain,job = jobs.get(False)
except: return # no more jobs left
try:
delayer = Delayer(mainDomain)
items = sorted(job.items()) # sorted will group http and https together
items.reverse()
while items:
url,checklist = items.pop()
if '\n' in url:
url = url.split('\n')
extraHeaders = url[1:] ; url = url[0]
else: extraHeaders = []
if (url,'lastFetch') in previous_timestamps and not '--test-all' in sys.argv: # (--test-all is different from removing .webcheck.last because it shouldn't also re-output old items in RSS feeds)
minDays = min(d[0] for d in checklist)
if minDays and previous_timestamps[(url,'lastFetch')]+minDays >= dayNo(): continue
oldPrev=previous_timestamps.get((url,'lastFetch'),None)
previous_timestamps[(url,'lastFetch')] = dayNo() # (keep it even if minDays==0, because that might be changed by later edits of webcheck.list)
try: r = doJob(opener,delayer,url,checklist,extraHeaders)
except CDNBackoff:
if oldPrev: previous_timestamps[(url,'lastFetch')] = oldPrev
break # skip all other items in this domain set
if r: # elseLogic yielded more items for this job (don't give to another thread, we need the same delayer as it might be retry on same URL)
r.reverse() ; items += r # try to keep pop() sequence in order
except Exception as e:
print ("Unhandled exception processing job "+repr(job))
print (traceback.format_exc())
jobs.task_done()
class CDNBackoff(Exception): pass
import threading ; lock = threading.Lock() # webcheck-debug
def doJob(opener,delayer,url,checklist,extraHeaders):
failRet = [c[2] for c in checklist if c[2]]
delayer.wait()
if url.startswith("dns://"): # DNS lookup
try: u,content = None, B(' '.join(sorted(set('('+x[-1][0]+')' for x in socket.getaddrinfo(url[6:],1))))) # TODO this 'sorted' is lexicographical not numeric; it should be OK for most simple cases though (keeping things in a defined order so can check 2 or 3 IPs on same line if the numbers are consecutive and hold same number of digits). Might be better if parse and numeric sort
except: u,content=None,B("DNS lookup failed")
textContent = content
elif url.startswith("wd://"): # run webdriver (this type of url is set internally: see read_input)
ua = [e for e in extraHeaders if e.lower().startswith('user-agent:')]
if ua: ua=ua[0].split(':',1)[1].strip()
else: ua = default_ua
u,(content,wasError) = None, run_webdriver(ua,url[5:].split(chr(0)),not failRet)
if wasError: return failRet
textContent = None # parse 'content' if needed
url = url[5:].split(chr(0),1)[0] # for display
elif url.startswith("up://"): # just test if server is up, and no error if not
try:
if sys.version_info >= (2,7,9) and not verify_SSL_certificates: urlopen(url[5:],context=ssl._create_unverified_context(),timeout=60)
else: urlopen(url[5:],timeout=60)
u,content = None,B("yes")
except: u,content = None,B("no")
textContent = content
elif url.startswith("e://"): # run edbrowse
from subprocess import Popen,PIPE
try: from subprocess import TimeoutExpired # Python 3.3+
except: TimeoutExpired = None
edEnv=os.environ.copy() ; edEnv["TMPDIR"]=getoutput("(TMPDIR=/dev/shm mktemp -d -t edXXXXXX || mktemp -d -t edXXXXXX) 2>/dev/null") # ensure unique cache dir if we're running several threads (TODO: what about edbrowse 3.7.6 and below, which hard-codes a single cache dir in /tmp: had we better ensure only one of these is run at a time, just in case? 3.7.7+ honours TMPDIR)
try: child = Popen(["edbrowse","-e"],-1,stdin=PIPE,stdout=PIPE,stderr=PIPE,env=edEnv)
except OSError:
print ("webcheck misconfigured: couldn't run edbrowse")
return # no need to update delayer, and probably no need to return failRet if it's an edbrowse misconfiguration
edcmd = B("b "+url[4:].replace('\\','\n')+"\n,p\nqt\n") # but this isn't really the page source (asking edbrowse for page source would be equivalent to fetching it ourselves; it doesn't tell us the DOM)
u = None
if TimeoutExpired:
try: content,stderr = child.communicate(edcmd,60)
except TimeoutExpired:
child.kill() ; content,stderr = child.communicate()
else: content,stderr = child.communicate(edcmd)
try:
import shutil
shutil.rmtree(edEnv["TMPDIR"])
except: pass
if child.returncode:
if not failRet:
print ("edbrowse failed on "+url)
# Most likely the failure was some link didn't exist when it should have, so show the output for debugging
print ("edbrowse output was: "+repr(content)+"\n")
delayer.done() ; return failRet
textContent = content.replace(B('{'),B(' ')).replace(B('}'),B(' ')) # edbrowse uses {...} to denote links
url = url[4:].split('\\',1)[0] # for display
elif url.startswith("c://"): # run command
content = getoutput(url[len("c://"):])
u = textContent = None
elif url.startswith("blocks-lynx://"):
r=Request(url[len("blocks-lynx://"):])
r.get_method=lambda:'HEAD'
r.add_header('User-agent','Lynx/2.8.9dev.4 libwww-FM/2.14')
u,content = None,B("no") # not blocking Lynx?
try: urlopen(r,timeout=60)
except Exception as e:
if type(e) in [HTTPError,socket.error,socket.timeout,ssl.SSLError]: # MIGHT be blocking Lynx (SSLError can be raised if hit the timeout), check:
r.add_header('User-agent',default_ua)
try:
urlopen(r,timeout=60)
content = B("yes") # error ONLY with Lynx, not with default UA
except Exception as e:
print ("Info: "+url+" got "+str(type(e))+" even without Lynx header, so not flagging as Lynx-blocking")
try: print (e.message)
except: pass
else:
print ("Info: "+url+" got "+str(type(e))+" (check the server exists at all?)")
try: print (e.message)
except: pass
textContent = content
elif url.startswith("head://"):
r=Request(url[len("head://"):])
r.get_method=lambda:'HEAD'
for h in extraHeaders: r.add_header(*tuple(x.strip() for x in h.split(':',1)))
if not any(h.lower().startswith("user-agent:") for h in extraHeaders): r.add_header('User-agent',default_ua)
u=None
if sys.version_info >= (2,7,9) and not verify_SSL_certificates: content=textContent=B(str(urlopen(r,context=ssl._create_unverified_context(),timeout=60).info()))
else: content=textContent=B(str(urlopen(r,timeout=60).info()))
elif url.startswith("gemini://"):
u = None
content,textContent = get_gemini(url)
else: # normal URL
if opener[0]==None: opener[0] = default_opener()
u,content = tryRead(url,opener[0],extraHeaders,all(t[1] and not t[1].startswith('#') for t in checklist)) # don't monitorError for RSS feeds (don't try to RSS-parse an error message)
textContent = None
delayer.done()
if content==None: return # not modified (so nothing to report), or problem retrieving (which will have been reported by tryRead0: TODO: return failRet in these circumstances so elseLogic can proceed?)
if B(content).startswith(B("<!DOCTYPE html>")) or B(content).startswith(B('<html lang=')): # check for CloudFlare backoff
textContent,errmsg=htmlStrings(content)
if B(textContent).startswith(B("Just a moment...")) and B("Enable JavaScript and cookies to continue") in B(textContent): raise CDNBackoff() # might not be able to check the rest of this domain today
if u:
lm = u.info().get("Last-Modified",None)
if lm: previous_timestamps[(url,'lastMod')] = lm
if keep_etags:
e = u.info().get("ETag",None)
if e: previous_timestamps[(url,'ETag')] = e
toRet = [] ; errmsg = ""
for item in checklist:
t = item[1]
if t.startswith('>'):
out=check(t[1:],content,"Source of "+url,"")
elif not t or t.startswith('#'):
parseRSS(url,content,t.replace('#','',1).strip())
out = None # (no else: after real RSS, but you can do else: for no extracted items: that goes through check() above)
else:
if textContent==None:
textContent,errmsg=htmlStrings(content)
out=check(t,textContent,url,errmsg)
if out: # something to alert, unless else:
if item[2]: toRet.append(item[2]) # else:
else: sys.stdout.write(out) # don't use 'print' or may have problems with threads
return toRet
class NoTracebackException(Exception):
def __init__(self,message): self.message = message
def run_webdriver(ua,actionList,reportErrors):
global webdriver # so run_webdriver_inner has it
try: from selenium import webdriver
except:
print ("webcheck misconfigured: can't import selenium (did you forget to set PYTHONPATH?)")
return B(""), True
try:
from selenium.webdriver.firefox.options import Options
options = Options() ; options.headless = True
browser = webdriver.Firefox(options=options)
except Exception as eFfx: # probably no Headless Firefox, try Headless Chrome
try:
from selenium.webdriver.chrome.options import Options
opts = Options()
opts.add_argument("--headless")
opts.add_argument("--disable-gpu")
opts.add_argument("--user-agent="+ua)
try: from inspect import getfullargspec as getargspec # Python 3
except ImportError:
try: from inspect import getargspec # Python 2
except ImportError: getargspec = None
try: useOptions = 'options' in getargspec(webdriver.chrome.webdriver.WebDriver.__init__).args
except: useOptions = False
if useOptions: browser = webdriver.Chrome(options=opts)
else: browser = webdriver.Chrome(chrome_options=opts)
except Exception as eChrome: # probably no HeadlessChrome, try PhantomJS
os.environ["QT_QPA_PLATFORM"]="offscreen"
sa = ['--ssl-protocol=any']
if not verify_SSL_certificates: sa.append('--ignore-ssl-errors=true')
try: browser = webdriver.PhantomJS(service_args=sa,service_log_path=os.path.devnull)
except Exception as ePJS:
print ("webcheck misconfigured: can't create Headless Firefox (%s), Headless Chrome (%s) or PhantomJS (%s). Check installation. (PATH=%s, cwd=%s, webdriver version %s)" % (str(eFfx),str(eChrome),str(ePJS),repr(os.environ.get("PATH","")),repr(os.getcwd()),repr(webdriver.__version__)))
return B(""), True
r = "" ; wasError = False
try: r = run_webdriver_inner(actionList,browser)
except CDNBackoff:
browser.quit() ; raise
except NoTracebackException as e:
if reportErrors: print (e.message)
else: wasError = True
except:
if reportErrors: print (traceback.format_exc())
else: wasError = True
browser.quit()
return r,wasError
def run_webdriver_inner(actionList,browser):
browser.set_window_size(1024, 768)
browser.implicitly_wait(2) # we have our own 'wait for text' and delay values, so the implicit wait does not have to be too high
def findElem(spec):
if spec.startswith('#'):
try: return browser.find_element_by_id(spec[1:])
except: return browser.find_element_by_name(spec[1:])
elif spec.startswith('.'):
if '#' in spec: return browser.find_elements_by_class_name(spec[1:spec.index('#')])[int(spec.split('#')[1])-1] # .class#1, .class#2 etc to choose the Nth element of that class
else: return browser.find_element_by_class_name(spec[1:])
else: return browser.find_element_by_link_text(spec)
def getSrc():
def f(b,switchBack=[]):
try: src = b.find_element_by_xpath("//*").get_attribute("outerHTML")
except: return u"getSrc webdriver exception but can retry" # can get timing-related WebDriverException: Message: Error - Unable to load Atom 'find_element'
for el in ['frame','iframe']:
for frame in b.find_elements_by_tag_name(el):
try: b.switch_to.frame(frame)
except: # StaleElementReferenceException is possible for some reason
src += "(Unable to switch to frame "+str(frame)+") "
continue
src += f(b,switchBack+[frame])
b.switch_to.default_content()
for fr in switchBack: b.switch_to.frame(fr)
return src
return f(browser).encode('utf-8')
snippets = []
for a in actionList:
if a.startswith('http'):
try: browser.get(a)
except: raise NoTracebackException("webdriver low-level timeout or error fetching "+a) # e.g. selenium.common.exceptions.TimeoutException
elif a.startswith('"') and a.endswith('"'):
# wait for "string" to appear in the source
tries = 30
while tries and not myFind(a[1:-1],getSrc()):
time.sleep(2) ; tries -= 1
if not tries:
src = getSrc() ; h=B(htmlStrings(src)[0])
if h.startswith(B("Just a moment...")) and B("Enable JavaScript and cookies to continue") in h: raise CDNBackoff() # 2024-07: can occur even in webdriver, even though that has JS + cookies
try: current_url = browser.current_url
except: current_url = "(unable to obtain)"
raise NoTracebackException("webdriver timeout while waiting for %s, current URL is %s content \"%s\"\n" % (repr(a[1:-1]),current_url,repr(src))) # don't quote current URL: if the resulting email is viewed in (at least some versions of) MHonArc, a bug can result in " being added to the href
elif a.startswith('[') and a.endswith(']'): # click
findElem(a[1:-1]).click()
elif a.startswith('/') and '/' in a[1:]: # click through items in a list to reveal each one (assume w/out Back)
start = a[1:a.rindex('/')]
delayAfter = a[a.rindex('/')+1:]
curNo,startNo,endNo = 0,1,0
propagate_errors = False
if ':' in delayAfter:
delayAfter,rest = delayAfter.split(':')
if rest.endswith('!'):
propagate_errors = True
rest = rest[:-1]
if '-' in rest:
startNo,endNo = rest.split('-')
startNo,endNo = int(startNo),int(endNo)
else: assert 0, "don't know how to parse "+rest
try: delayAfter = int(delayAfter)
except: delayAfter = 1
if start.startswith('.'):
startClass = start[1:]
if '.' in startClass: startClass,closeClass = startClass.split('.')
else: closeClass = None
if startNo>1 and sys.stderr.isatty(): sys.stderr.write('(skip %d)' % (startNo-1)),sys.stderr.flush()
for m in browser.find_elements_by_class_name(startClass):
curNo += 1
if curNo < startNo: continue
if endNo and curNo > endNo: break
try:
m.click()
if sys.stderr.isatty(): sys.stderr.write('*'),sys.stderr.flush()
except:
if sys.stderr.isatty(): sys.stderr.write('?'),sys.stderr.flush()
if propagate_errors: raise NoTracebackException(a+" failed to open instance "+str(curNo)+" (error because ! is set)")
else: continue
time.sleep(delayAfter)
snippets.append(getSrc())
if closeClass:
l = list(browser.find_elements_by_class_name(closeClass))
for c in l:
try:
c.click()
if sys.stderr.isatty(): sys.stderr.write('x'),sys.stderr.flush()
if not browser.find_elements_by_class_name(closeClass)==l: break # it did something
except: pass # maybe it wasn't that one
time.sleep(delayAfter)
else:
l = re.findall(B(' [iI][dD] *="('+re.escape(start)+'[^"]*)'),getSrc()) + re.findall(B(' [iI][dD] *=('+re.escape(start)+'[^"> ]*)'),getSrc())
for m in l:
curNo += 1
if curNo < startNo: continue
if endNo and curNo > endNo: break
try:
browser.find_element_by_id(m).click()
if sys.stderr.isatty(): sys.stderr.write('*'),sys.stderr.flush() # webdriver's '.' for click-multiple
except:
if sys.stderr.isatty(): sys.stderr.write('?'),sys.stderr.flush()
if propagate_errors: raise NoTracebackException(a+" failed to open instance "+str(curNo)+" (error because ! is set)")
else: continue
time.sleep(delayAfter)
snippets.append(getSrc())
elif '->' in a: # set a selection box
spec, val = a.split('->',1)
e = webdriver.support.ui.Select(findElem(spec))
if val.startswith('"') and val.endswith('"'): val=val[1:-1]
if val: e.select_by_visible_text(val)
else: e.deselect_all()
elif a.endswith('*0'): # clear a checkbox
e = findElem(a[:-2])
if e.is_selected(): e.click()
elif a.endswith('*1'): # check a checkbox
e = findElem(a[:-2])
if not e.is_selected(): e.click()
elif '=' in a: # put text in an input box
spec, val = a.split('=',1)
if val.startswith('"') and val.endswith('"'): val=val[1:-1]
findElem(spec).send_keys(val)
elif re.match("[0-9]+$",a): time.sleep(int(a))
else: sys.stdout.write("Ignoring webdriver unknown action "+repr(a)+'\n')
if sys.stderr.isatty(): sys.stderr.write(':'),sys.stderr.flush() # webdriver's '.'
time.sleep(2)
snippets.append(getSrc())
return B('\n').join(snippets)
def get_gemini(url,nestLevel=0):
if nestLevel > 9: return B("Too many redirects"),B("Too many redirects")
url = B(url)
host0 = host = re.match(B("gemini://([^/?#]*)"),url).groups(1)[0]
port = re.match(B(".*:([0-9]+)$"),host)
if port:
port = int(port.groups(1)[0])
host = host[:host.rindex(B(":"))]
else: port = 1965
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.settimeout(60) ; s.connect((host,port))
try: protocol = ssl.PROTOCOL_TLS_CLIENT # Python 3.6+
except: protocol=ssl.PROTOCOL_TLS
ss = ssl.SSLContext(protocol)
ss.check_hostname,ss.verify_mode = False,ssl.CERT_NONE
s=ss.wrap_socket(s,server_hostname=host)
s.send(url+B("\r\n")) ; g=[]
while not g or g[-1]: g.append(s.recv())
s.close() ; g=B("").join(g)
if B("\r\n") in g:
header,body = g.split(B("\r\n"),1)
else: header,body = g,B("")
if B(" ") in header: status,meta = header.split(B(" "),1)
else: status,meta = B("?"),header
try: status = int(status)
except: status = 0
if 20 <= status <= 29:
if meta.startswith(B("text/gemini")):
txtonly = re.sub(B("\n *=> +[^ ]*"),B("\n"),body)
elif B("html") in meta: txtonly = None # will result in htmlStrings
else: txtonly = body
return body,txtonly
elif 30 <= status <= 39:
if meta.startswith(B("gemini://")):
return get_gemini(meta,nestLevel+1)
elif meta.startswith(B("/")):
return get_gemini(B("gemini://")+host0+meta,nestLevel+1)
else: return get_gemini(url[:url.rindex(B("/"))+1]+meta,nestLevel+1) # TODO: handle ../ ourselves? or let server do it? (early protocol specification and practice unclear)
else: return meta,meta # input prompt, error message, or certificate required
def dayNo(): return int(time.mktime(time.localtime()[:3]+(0,)*6))/(3600*24)
def tryRead(url,opener,extraHeaders,monitorError=True,refreshTry=5):
oldAddHeaders = opener.addheaders[:]
for h in extraHeaders:
if h.lower().startswith("user-agent") and opener.addheaders[0][0]=="User-agent": del opener.addheaders[0] # User-agent override (will be restored after by oldAddHeaders) (TODO: override in run_webdriver also)
opener.addheaders.append(tuple(x.strip() for x in h.split(':',1)))
if (url,'lastMod') in previous_timestamps and not '--test-all' in sys.argv:
opener.addheaders.append(("If-Modified-Since",previous_timestamps[(url,'lastMod')]))
if keep_etags and (url,'ETag') in previous_timestamps and not '--test-all' in sys.argv:
opener.addheaders.append(("If-None-Match",previous_timestamps[(url,'lastMod')]))
ret = tryRead0(url,opener,monitorError)
opener.addheaders = oldAddHeaders
if refreshTry: # meta refresh redirects
u,content = ret
if content: m = re.search(br'(?is)<head>.*?<meta http-equiv="refresh" content="0; *url=([^"]*)".*?>.*?</head>',content) # TODO: if string found, remove comments and re-check (or even parse properly) ?
else: m = None # content==None if 304 not modified
if m:
m = m.groups(1)[0]
if type(u"")==type(""): m=m.decode('latin1')
return tryRead(urlparse.urljoin(url,m),opener,extraHeaders,monitorError,refreshTry-1)
return ret
def tryRead0(url,opener,monitorError):
url = re.sub("[^!-~]+",lambda m:quote(m.group()),url) # it seems some versions of the library do this automatically but others don't
u = None
try:
u = opener.open(url,timeout=60)
return u,tryGzip(u.read())
except HTTPError as e:
if e.code==304: return None,None # not modified
elif monitorError: return None,tryGzip(e.fp.read()) # as might want to monitor some phrase on a 404 page
sys.stdout.write("Error "+str(e.code)+" retrieving "+linkify(url)+"\n") ; return None,None
except: # try it with a fresh opener and no headers
try:
if sys.version_info >= (2,7,9) and not verify_SSL_certificates: u = build_opener(OurRedirHandler(),HTTPCookieProcessor(),HTTPSHandler(context=ssl._create_unverified_context())).open(url,timeout=60)
else: u = build_opener(OurRedirHandler(),HTTPCookieProcessor()).open(url,timeout=60)
return u,tryGzip(u.read())
except HTTPError as e:
if monitorError: return u,tryGzip(e.fp.read())
sys.stdout.write("Error "+str(e.code)+" retrieving "+linkify(url)+"\n") ; return None,None
except URLError as e: # don't need full traceback for URLError, just the message itself
sys.stdout.write("Problem retrieving "+linkify(url)+"\n"+str(e)+"\n")
return None,None
except socket.timeout:
sys.stdout.write("Timed out retrieving "+linkify(url)+"\n")
return None,None
except: # full traceback by default
sys.stdout.write("Problem retrieving "+linkify(url)+"\n"+traceback.format_exc())
return None,None
class OurRedirHandler(HTTPErrorProcessor):
def __init__(self,nestLevel=0): self.nestLevel = nestLevel
def our_response(self,request,response,prefix):
try: code=response.code
except: return response
if code not in [301,302,303,307,308]: return response
url = re.sub("[^!-~]+",lambda m:quote(m.group()),response.headers['Location']) # not all versions of the library do this, so we'll do it here if simple-open failed
if self.nestLevel>9: raise Exception("too many redirects")
if url.startswith("//"): url=prefix+url
elif url.startswith("/"): url=urlparse.urljoin(request.get_full_url(),url)
if sys.version_info >= (2,7,9) and not verify_SSL_certificates: return build_opener(OurRedirHandler(self.nestLevel+1),HTTPCookieProcessor(),HTTPSHandler(context=ssl._create_unverified_context())).open(url,timeout=60)
else: return build_opener(OurRedirHandler(self.nestLevel+1),HTTPCookieProcessor()).open(url,timeout=60)
def http_response(self,request,response):
return self.our_response(request,response,"http:")
def https_response(self,request,response):
return self.our_response(request,response,"https:")
def tryGzip(t):
try: return gzip.GzipFile('','rb',9,StringIO(t)).read()
except: return t
def check(text,content,url,errmsg):
if ' #' in text: text,comment = text.split(' #',1) # (comments must be preceded by a space, otherwise interpreted as part of the text as this is sometimes needed in codes)
else: comment = ""
orig_comment = comment = comment.strip()
if comment: comment="\n "+paren(comment)
text = text.strip()
assert text # or should have gone to parseRSS instead
if text.startswith('{') and text.endswith('}') and '...' in text: return extract(url,content,text[1:-1].split('...'),orig_comment)
elif text.startswith("!"): # 'not', so alert if DOES contain
if len(text)==1: return # TODO: print error?
if myFind(text[1:],content):
return url+" contains "+text[1:]+comment+errmsg+"\n"
elif not myFind(text,content): # alert if DOESN'T contain
r=linkify(url)+" no longer contains "+text+comment+errmsg+"\n"
if '??show?' in orig_comment: writeBuf(sys.stdout,B("Debug: contents of "+linkify(url)+" is:\n")+B(content)+B('\n')) # TODO: document this
return r
def parseRSS(url,content,comment):
from xml.parsers import expat
parser = expat.ParserCreate()
items = [[[],[],[],[]]] ; curElem = [None]
def StartElementHandler(name,attrs):
if name in ['item','entry']: items.append([[],[],[],[]])
if name=='title': curElem[0]=0
elif name=='link': curElem[0]=1
elif name in ['description','summary']: curElem[0]=2
elif name=='pubDate': curElem[0]=3
else: curElem[0]=None
if name=='link' and 'href' in attrs: # (note this isn't the ONLY way an href could get in: <link>http...</link> is also possible, and is handled by CharacterDataHandler below, hence EndElementHandler is important for separating links)
items[-1][curElem[0]].append(attrs['href']+' ')
def EndElementHandler(name):
if name in ['item','entry']: # ensure any <link>s outside <item>s are separated
items.append([[],[],[],[]])
curElem[0]=None
elif name in ['description','summary','title','link']:
if not curElem[0]==None: items[-1][curElem[0]].append(' ') # ensure any additional ones are space-separated
curElem[0]=None
def CharacterDataHandler(data):
if data and not curElem[0]==None:
items[-1][curElem[0]].append(data)
parser.StartElementHandler = StartElementHandler
parser.EndElementHandler = EndElementHandler
parser.CharacterDataHandler = CharacterDataHandler
if type(u"")==type("") and not type(content)==type(""): content = content.decode("utf-8") # Python 3 (expat needs 'strings' on each platform)
try: parser.Parse(re.sub("&[A-Za-z]*;",entityref,content),1)
except expat.error as e: sys.stdout.write("RSS parse error in "+url+paren(comment)+":\n"+repr(e)+"\n(You might want to check if this URL is still serving RSS)\n\n") # and continue with handleRSS ? (it won't erase our existing items if the new list is empty, as it will be in the case of the parse error having been caused by a temporary server error)
for i in xrange(len(items)):
items[i][1] = "".join(urlparse.urljoin(url,w) for w in "".join(items[i][1]).strip().split()).strip() # handle links relative to the RSS itself
for j in [0,2,3]: items[i][j]=re.sub(r"\s+"," ",u"".join(U(x) for x in items[i][j])).strip()
handleRSS(url,items,comment)
def entityref(m):
m=m.group()[1:-1] ; m2 = None
try: m2=unichr(htmlentitydefs.name2codepoint[m])
except:
try:
if m.startswith("#x"): m2=unichr(int(m[2:],16))
elif m.startswith("#"): m2=unichr(int(m[1:]))
except: pass
if m2 and not m2 in "<>&":
if type(u"")==type(""): return m2
else: return m2.encode('utf-8')
return "&"+m+";"
def paren(comment):
comment = " ".join(comment.replace("??track-links-only?","").split())
if not comment or (comment.startswith('(') and comment.endswith(')')): return comment
else: return " ("+comment+")"
def handleRSS(url,items,comment,itemType="RSS/Atom"):
newItems = [] ; pKeep = set()
for title,link,txt,date in items:
if not title: continue # valid entry must have title
if "??track-links-only?" in comment: hashTitle,hashTxt = date,"" # TODO: document this, it's for when text might change because for example we're fetching it through an add-annotation CGI that can change, but don't ignore if the publication date has changed due to an update (TODO: might be better to do this via a 'pipe to postprocessing' option instead?)
else: hashTitle,hashTxt = title,re.sub("</?[A-Za-z][^>]*>","",txt) # (ignore HTML markup in RSS, since it sometimes includes things like renumbered IDs)
k = (url,'seenItem',hashlib.md5(repr((hashTitle,link,hashTxt)).encode("utf-8")).digest()) # TODO: option not to call hashlib, in case someone has the space and is concerned about the small probability of hash collisions? (The Python2-only version of webcheck just used Python's built-in hash(), but in Python 3 that is no longer stable across sessions, so use md5)
pKeep.add(k)
if k in previous_timestamps and not '--show-seen-rss' in sys.argv: continue # seen this one already
previous_timestamps[k] = True
txt = re.sub("&#x([0-9A-Fa-f]*);",lambda m:unichr(int(m.group(1),16)),re.sub("&#([0-9]*);",lambda m:unichr(int(m.group(1))),txt)) # decode &#..; HTML entities (sometimes used for CJK), but leave < etc as-is (in RSS it would have originated with a double-'escaped' < within 'escaped' html markup)
txt = re.sub("</?[A-Za-z][^>]*>",simplifyTag,txt) # avoid overly-verbose HTML (but still allow some)
txt = re.sub("<[pPbBiIuUsS]></[pPbBiIuUsS]>","",txt).strip() # sometimes left after simplifyTag removes img
if txt: txt += '\n'
newItems.append(title+'\n'+txt+linkify(link))
if not pKeep: return # if the feed completely failed to fetch, don't erase what we have
for k in list(previous_timestamps.keys()):
if k[:2]==(url,'seenItem') and not k in pKeep:
del previous_timestamps[k] # dropped from the feed
if newItems: writeBuf(sys.stdout,(str(len(newItems))+" new "+itemType+" items in "+url+paren(comment)+' :\n'+'\n---\n'.join(n.strip() for n in newItems)+'\n\n').encode('utf-8'))
def simplifyAttr(match):
m = match.group()
if m.lower().startswith(" href="): return m
else: return ""
def simplifyTag(match):
m = match.group()
t = m.split()[0].replace('<','').replace('>','').replace('/','')
if t=='a': return re.sub(' [A-Za-z]+="[^"]*"',simplifyAttr,m)
elif t in ['p','br','em','strong','b','i','u','s']:
if ' ' in m: return m.split()[0]+'>' # strip attributes
else: return m
else: return "" # strip entire tag
def linkify(link): return S(link).replace("(","%28").replace(")","%29") # for email clients etc that terminate URLs at parens
def extract(url,content,startEndMarkers,comment):
assert len(startEndMarkers)==2, "Should have exactly one '...' between the braces when extracting items"
start,end = startEndMarkers
content,start,end = B(content),B(start),B(end)
i=0 ; items = []
while True:
i = content.find(start,i)
if i==-1: break
j = content.find(end,i+len(start))
if j==-1: break
c = content[i+len(start):j].decode('utf-8').strip()
if c: items.append(('Auto-extracted text:','',c,"")) # NB the 'title' field must not be empty (unless we relocate that logic to parseRSS instead of handleRSS)
i = j+len(end)
if not items: return ("No items were extracted from "+url+" via "+S(start)+"..."+S(end)+" (check that site changes haven't invalidated this extraction rule)")
else: handleRSS(url,items,comment,"extracted")
def myFind(text,content):
text,content = B(text),B(content)
if text[:1]==B("*"): return re.search(text[1:],content)
elif text in content: return True
return normalisePunc(text) in normalisePunc(content)
def normalisePunc(t):
"normalise apostrophes; collapse (but don't ignore) whitespace and ignore double-quotes because they might have been <Q> elements; fold case"
for s,r in [
(u"\u2013".encode('utf-8'),B("-")), # en-dash
(u"\u2019".encode('utf-8'),B("'")),
(u"\u2018".encode('utf-8'),B("'")),
(u"\u201C".encode('utf-8'),B("")),
(u"\u201D".encode('utf-8'),B("")),
(B('"'),B("")),
(u"\u00A0".encode('utf-8'),B(" ")),
(u"\uFEFF".encode('utf-8'),B("")),
(u"\u200B".encode('utf-8'),B(""))
]: t=t.replace(s,r)
return re.sub(B(r"(\s)\s+"),B(r"\1"),t).lower()
if __name__=="__main__":
if "--version" in sys.argv: print(__doc__)
elif "--help" in sys.argv: print("python webcheck.py [--single-thread] [--test-all] [--show-seen-rss] | --help | --version")
else: main()