-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathMostPopularSuperhero.py
54 lines (42 loc) · 1.62 KB
/
MostPopularSuperhero.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from mrjob.job import MRJob
from mrjob.step import MRStep
import mrjob.protocol
class MRMostPopularSuperhero(MRJob):
def __init__(self, *args, **kwargs):
super(MRMostPopularSuperhero, self).__init__(*args, **kwargs)
self.last=(0,[])
def steps(self):
return [
MRStep(mapper=self.mapper, reducer=self.reducer),
MRStep(mapper=self.mapper_identity, # mrjob 0.4.6 bug : https://github.com/Yelp/mrjob/issues/1141
reducer=self.reducer_get_last, reducer_final=self.reducer_final)
]
def mapper(self, key, line):
try:
heroID, tail = line.split(None, 1)
except ValueError:
heroID = line
tail = ""
if tail and tail[0] == '"':
# assume this is an heroID, heroName mapping
yield heroID, { 'name' : unicode(tail[1:-1],'latin1') }
else:
# assume this is a graph connection
yield heroID, { 'count' : len(tail.split()) }
def reducer(self, heroID, cols):
record = { 'heroID' : heroID, 'count' : 0 }
for col in cols: # merge records
count = record['count'] + col.get('count', 0)
record.update(col)
record['count'] = count
yield "{:6d}".format(record['count']), record
def reducer_get_last(self, *args):
self.last = args
def mapper_identity(self, *args):
yield args
def reducer_final(self):
count, records = self.last
for record in records:
yield count, record
if __name__ == "__main__":
MRMostPopularSuperhero.run()