-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathpreprocessing_for_LDA.py
131 lines (100 loc) · 5.01 KB
/
preprocessing_for_LDA.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import re # needed to remove special character
from pyspark import Row
import json
from pyspark.sql import SQLContext
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import Tokenizer, CountVectorizer
from pyspark.mllib.clustering import LDA
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, LongType
from pyspark.sql.functions import *
from nltk.stem.porter import *
from pyspark.sql.types import *
mydir = ('file:/vagrant/project/file/review.json')
sqlContext = SQLContext(sc)
df1 = sqlContext.read.json(mydir)
mydir2 = ('file:/vagrant/project/file/business.json')
df2 = sqlContext.read.json(mydir2)
## joining business and review jsons
df = df1.join(df2,(df1.business_id==df2.business_id)).drop(df2.business_id)
## filtering all the chinese cuisine places from the dataset.
## You can similarly filter out different cuisines
asian=combined_df.where(
array_contains(combined_df.categories,"Chinese")|\
array_contains(combined_df.categories,"Cantonese")|\
array_contains(combined_df.categories, "Taiwanese")|\
array_contains(combined_df.categories, "Szechuan"))
## creating a temp table
asian.registerTempTable('asian')
asiandf = sqlContext.sql('select business_id, text from asian')
## storing in a rdd
asianrdd = asiandf.rdd
## keeping only words. No numbers or spaces.
pattern1 = re.compile('\W+|\W+$|[^\w\s]+|_')
pattern2 = re.compile(r'\W*\b\w{1,2}\b')
rdd = asianrdd \
.mapValues(lambda x: pattern1.sub(' ', x)) \
.mapValues(lambda x: pattern2.sub(' ', x))
df = rdd.toDF(schema=['file', 'text'])
## creating a dictionary so that each review/text has a unique index to it
row_with_index = Row(*["id"] + df.columns)
def make_row(columns):
def _make_row(row, uid):
row_dict = row.asDict()
return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
return _make_row
f = make_row(df.columns)
indexed = (df.rdd
.zipWithUniqueId()
.map(lambda x: f(*x))
.toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
## tokenizing the reviews, removing stopwords, stemming and storing the results in a dataframe
# tokenize
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
tokenized = tokenizer.transform(indexed)
print 'done'
# remove stop words
stopwordList = ['','get','got','also','really','would','one','good','like','great','tri','love','two','three','took','awesome',
'me','bad','horrible','disgusting','terrible','fabulous','amazing','terrific','worst','best','fine','excellent','acceptable',
'my','exceptional','satisfactory','satisfying','super','awful','atrocious','unacceptable','poor','sad','gross','authentic',
'myself','cheap','expensive','we', 'our', 'ours', 'ourselves', 'you', 'your', 'yours', 'yourself', 'yourselves', 'he', 'him',
'his', 'himself', 'she', 'her', 'hers', 'herself', 'it', 'its', 'itself', 'they', 'them', 'their', 'theirs', 'themselves', 'what',
'which', 'who', 'whom', 'this', 'that', 'these', 'those', 'am', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has',
'had', 'having', 'do', 'does', 'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because', 'as', 'until', 'while', 'of',
'at', 'by', 'for', 'with', 'about', 'against', 'between', 'into', 'through', 'during', 'before', 'after', 'above', 'below', 'to',
'from', 'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under', 'again', 'further', 'then', 'once', 'here', 'there', 'when', 'where',
'why', 'how', 'all', 'any', 'both', 'each', 'few', 'more', 'most', 'other', 'some', 'such', 'no', 'nor', 'not', 'only', 'own', 'same',
'so', 'than', 'too', 'very', 's', 't', 'can', 'will', 'just', 'don', 'should', 'now', 'd', 'll', 'm', 'o', 're', 've', 'y', 'ain',
'aren', 'couldn', 'didn', 'doesn', 'hadn', 'hasn', 'haven', 'isn', 'ma', 'mightn', 'mustn', 'needn', 'shan', 'shouldn', 'wasn',
'weren', 'won', 'wouldn']
remover=StopWordsRemover(inputCol="tokens", outputCol="words" ,stopWords=stopwordList)
#remover = StopWordsRemover(inputCol="tokens", outputCol="words",stopword)
cleaned = remover.transform(tokenized)
print 'done'
#stem words
# Instantiate stemmer object
stemmer = PorterStemmer()
# Create stemmer python function
def stem(in_vec):
out_vec = []
for t in in_vec:
t_stem = stemmer.stem(t)
if len(t_stem) > 2:
out_vec.append(t_stem)
return out_vec
# Create user defined function for stemming with return type Array<String>
stemmer_udf = udf(lambda x: stem(x), ArrayType(StringType()))
# Create new df with vectors containing the stemmed tokens
# Create new df with vectors containing the stemmed tokens
vector_stemmed_df = (
cleaned
.withColumn("vector_stemmed", stemmer_udf("words"))
)
# vectorize
cv = CountVectorizer(inputCol="vector_stemmed", outputCol="vectors")
print 'done'
count_vectorizer_model = cv.fit(vector_stemmed_df)
print 'done'
result = count_vectorizer_model.transform(vector_stemmed_df)
corpus = result.select(F.col('id').cast("long"), 'vectors').rdd \
.map(lambda x: [x[0], x[1]])