-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathevaluator.py
120 lines (93 loc) · 4.51 KB
/
evaluator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# Perform batch process to generate a classfication model
# Extract TF-IDF features using spark and then train naive bayes classifier to do classification
import logging
import ConfigParser
import pandas as pd
import csv
import atexit
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, IDFModel
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import NaiveBayes, NaiveBayesModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import udf, col
logging.basicConfig()
logger=logging.getLogger('model_evaluate')
logger.setLevel(logging.DEBUG)
config=ConfigParser.ConfigParser()
config.read('model_generation.cfg')
master=config.get('spark','master')
posts_file=config.get('io', 'post_file')
tags_file=config.get('io', 'tags_file')
selected_tags_file=config.get('io', 'selected_tags_file')
idf_model_file=config.get('io','idf_model_file')
nb_model_file=config.get('io','nb_model_file')
hashing_tf_file=config.get('io', 'hashing_tf_file')
tokenizer_file=config.get('io', 'tokenizer_file')
def shutdown_hook(spark_session):
try:
spark_session.close()
logger.debug("Successfully stop spark session and spark context")
except:
logger.debug("Fail to stop spark session and spark context")
if __name__ == '__main__':
# Try to initialize a spark cluster with master, master can be local or mesos URL, which is configurable in config file
try:
logger.debug("Initializing Spark cluster")
conf=SparkConf().setAppName('model_generation').setMaster(master)
sc=SparkContext(conf=conf)
logger.debug("Created Spark cluster successfully")
except:
logger.error("Fail to initialize spark cluster")
try:
spark=SparkSession.builder.config(conf=conf).getOrCreate()
logger.debug("Initialized spark session successfully")
except:
logger.error("Fail to start spark session")
# Input the dataset
try:
logger.debug("Start to read the input dataset")
posts_df=spark.read.json(posts_file)
tags_df=spark.read.csv(tags_file, header=True)
selected_tags=pd.read_csv(selected_tags_file, header=None)
local_tags_to_catId=dict(zip(selected_tags[0], list(selected_tags.index)))
local_catId_to_tags=dict(zip(list(selected_tags.index), selected_tags[0]))
tags_to_catId=sc.broadcast(local_tags_to_catId)
catId_to_tags=sc.broadcast(local_catId_to_tags)
tags_set=sc.broadcast(set(selected_tags[0]))
logger.debug("Read in dataset successfully")
except:
logger.error("Can't input dataset")
# Join posts_df and tags_df together and prepare training dataset
selected_tags_df=tags_df.filter(tags_df.Tag.isin(tags_set.value)).na.drop(how = 'any')
tags_questions_df=selected_tags_df.join(posts_df, "Id")
training_df=tags_questions_df.select(['Tag', 'Body','Id']).na.drop(how = 'any')
logger.debug("successfully get training_df")
# tokenize post texts and get term frequency and inverted document frequency
logger.debug("Start to generate TFIDF features")
tokenizer=Tokenizer.load(tokenizer_file)h
tokenized_words=tokenizer.transform(training_df.na.drop(how = 'any'))
hashing_TF=HashingTF.load(hashing_tf_file)
TFfeatures=hashing_TF.transform(tokenized_words.na.drop(how = 'any'))i
idfModel=IDFModel.load(idf_model_file)
TFIDFfeatures=idfModel.transform(TFfeatures.na.drop(how = 'any'))
logger.debug("Get TFIDF features successfully")
# for feature in TFIDFfeatures.select("IDF_features", "Tag").take(3):
# logger.info(feature)
# register shutdown_hook
atexit.register(shutdown_hook, spark_session=spark)
# Row(IDF_features=SparseVector(200, {7: 2.3773, 9: 2.1588, 32: 2.0067, 37: 1.7143, 49: 2.6727, 59: 2.9361, 114: 1.0654, 145: 2.9522, 167: 2.3751}), Tag=u'asp.net')
# Trasfer data to be in labeled point format
test=TFIDFfeatures.rdd.map(lambda row: (float(tags_to_catId.value[row.Tag]), row.IDF_features, row.Id)).toDF()
# Train Naive Bayes model
nb_model=NaiveBayesModel.load(nb_model_file)
# Evaluation the model
# test_df=test.rdd.map(lambda row: ((row._2, row._3),[row._1])).reduceByKey(lambda a,b: a+b)
# print test_df.collect()
predictions=nb_model.transform(test)
evaluator=MulticlassClassificationEvaluator(labelCol="_1", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy/0.6023699978752843))
# prediction_and_label = test.map(lambda point : (nb_model.predict(point.features), point.label))
# accuracy = 1.0 * prediction_and_label.filter(lambda x: 1.0 if x[0] == x[1] else 0.0).count() / test.count()