-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmost-popular-movie.py
executable file
·39 lines (31 loc) · 1.36 KB
/
most-popular-movie.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from pyspark import SparkConf, SparkContext
# you don't need below 2 lines in pyspark interactive shell !
conf = SparkConf().setMaster("local").setAppName("MostPopularMovie")
sc = SparkContext(conf = conf)
# for a better written code check, aws-emr-jupiter-notebooks/most-popular-movie.ipynb file.
# using collectAsMap() function, we can transform an RDD to Dictionary.
# So it is much more concise !
#broadcast movienames to all executers, for meaningful movie ids
#movie-id|movie-name|movie-date|imdb-url|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
def loadMovieNames():
movieNames = {}
f = open("./spark-sandbox/datasets/ml-100k/u.item", "r", encoding="utf-8", errors="ignore")
lines = f.readlines()
for line in lines:
fields = line.split("|")
movieNames[int(fields[0])] = fields[1]
return movieNames
#broadcast master dataset
nameDictionary = sc.broadcast(loadMovieNames())
#user-id,movie-id,rating,timespamp
lines = sc.textFile("./spark-sandbox/ml-100k/u.data")
movies = lines.map(lambda x: x.split()) \
.map(lambda x: (int(x[1]), 1)) \
.reduceByKey(lambda x,y: (x+y)) \
.map(lambda x: (x[1], x[0])) \
.sortByKey() \
.map(lambda x: (nameDictionary.value[x[1]], x[0])) \
.collect()
for key, value in movies:
#movie-name popularity
print("%s %i" % (key, value))