forked from commoncrawl/cc-pyspark
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcc_index_word_count.py
73 lines (58 loc) · 2.59 KB
/
cc_index_word_count.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from collections import Counter
from bs4 import BeautifulSoup
from bs4.dammit import EncodingDetector
from sparkcc import CCIndexWarcSparkJob
from word_count import WordCountJob
class CCIndexWordCountJob(WordCountJob, CCIndexWarcSparkJob):
""" Word count (frequency list) from WARC records matching a SQL query
on the columnar URL index """
name = "CCIndexWordCount"
records_parsing_failed = None
records_non_html = None
def init_accumulators(self, session):
super(CCIndexWordCountJob, self).init_accumulators(session)
sc = session.sparkContext
self.records_parsing_failed = sc.accumulator(0)
self.records_non_html = sc.accumulator(0)
def log_accumulators(self, session):
super(CCIndexWordCountJob, self).log_accumulators(session)
self.log_accumulator(session, self.records_parsing_failed,
'records failed to parse = {}')
self.log_accumulator(session, self.records_non_html,
'records not HTML = {}')
@staticmethod
def reduce_by_key_func(a, b):
# sum values of tuple <term_frequency, document_frequency>
return ((a[0] + b[0]), (a[1] + b[1]))
def html_to_text(self, page, record):
try:
encoding = self.get_warc_header(record, 'WARC-Identified-Content-Charset')
if not encoding:
for encoding in EncodingDetector(page, is_html=True).encodings:
# take the first detected encoding
break
soup = BeautifulSoup(page, 'lxml', from_encoding=encoding)
for script in soup(['script', 'style']):
script.extract()
return soup.get_text(' ', strip=True)
except Exception as e:
self.get_logger().error("Error converting HTML to text for {}: {}",
self.get_warc_header(record, 'WARC-Target-URI'), e)
self.records_parsing_failed.add(1)
return ''
def process_record(self, record):
if not self.is_response_record(record):
# skip over WARC request or metadata records
return
if not self.is_html(record):
self.records_non_html.add(1)
return
page = self.get_payload_stream(record).read()
text = self.html_to_text(page, record)
words = map(lambda w: w.lower(),
WordCountJob.word_pattern.findall(text))
for word, count in Counter(words).items():
yield word, (count, 1)
if __name__ == '__main__':
job = CCIndexWordCountJob()
job.run()