diff --git a/hw0/README.md b/hw0/README.md deleted file mode 100644 index b9d1672..0000000 --- a/hw0/README.md +++ /dev/null @@ -1,17 +0,0 @@ -Данные: - -https://www.kaggle.com/dgomonov/new-york-city-airbnb-open-data?select=AB_NYC_2019.csv - - -Задачи: - -1) Посчитать медиану, моду и среднее и дисперсию для каждого room_type -2) Найти самое дорогое и самое дешевое предложение -3) Посчитать корреляцию между ценой и минимальный количеством ночей, кол-вом отзывов -4) Нужно найти гео квадрат размером 5км на 5км с самой высокой средней стоимостью жилья - -P.S Пул ревест и пжлст сделайте подпапку со своей фамилией - -Технологии: - -Spark / Scala / Java \ No newline at end of file diff --git a/hw1/README.md b/hw1/README.md deleted file mode 100644 index e38a446..0000000 --- a/hw1/README.md +++ /dev/null @@ -1,19 +0,0 @@ -Данные: - -https://www.kaggle.com/c/nlp-getting-started/overview - -Задачи: - -1) Решить на Spark задачу -2) Сделать сабмит в соревнование - -Подсказки: - -TF/IDF,LDA,StopWords,Tokenizing,Stemming - - -P.S Пул ревест и пжлст сделайте подпапку со своей фамилией - -Технологии: - -Spark / Scala / Java \ No newline at end of file diff --git a/hw2/moreva/task2/path/part-00000-eeaaab8e-ac72-4d12-8107-e4ecff64c167-c000.csv b/hw2/moreva/task2/path/part-00000-eeaaab8e-ac72-4d12-8107-e4ecff64c167-c000.csv new file mode 100644 index 0000000..b801daa --- /dev/null +++ b/hw2/moreva/task2/path/part-00000-eeaaab8e-ac72-4d12-8107-e4ecff64c167-c000.csv @@ -0,0 +1,2 @@ +id,target +10,0 diff --git a/hw2/moreva/task2/src/main/scala/sample/HomeWork1.scala b/hw2/moreva/task2/src/main/scala/sample/HomeWork1.scala new file mode 100644 index 0000000..9cdb98a --- /dev/null +++ b/hw2/moreva/task2/src/main/scala/sample/HomeWork1.scala @@ -0,0 +1,100 @@ +package sample + +import org.apache.spark.ml.{Pipeline, PipelineModel} +import org.apache.spark.ml.classification.{GBTClassifier, RandomForestClassifier} +import org.apache.spark.ml.feature._ +import org.apache.spark.mllib.feature.Stemmer +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.DataTypes + +object HomeWork1 { + def main(args: Array[String]): Unit = { + val spark = SparkSession + .builder() + .appName("Task2") + .master("local[*]") + .getOrCreate() + + spark.sparkContext.setLogLevel("OFF") + + val testSet = spark + .read.format("csv") + .option("header", "true") + .option("inferSchema", "true") + .load("src/main/resources/test.csv") + .filter(col("id").isNotNull) + .filter(col("text").isNotNull) + .select("id", "text") + + val trainSet = spark + .read.format("csv") + .option("header", "true") + .option("inferSchema", "true") + .load("src/main/resources/train.csv") + .filter(col("id").isNotNull) + .filter(col("target").isNotNull) + .filter(col("text").isNotNull) + .select("id", "text", "target") + .withColumnRenamed("target", "tLabel") + + val sampleSet = spark + .read.format("csv") + .option("header", "true") + .option("inferSchema", "true") + .load("src/main/resources/sample_submission.csv") + .select("id") + + val token = new RegexTokenizer() + .setInputCol("text") + .setOutputCol("words") + .setPattern("[\\W]") + + val stwr = new StopWordsRemover() + .setInputCol(token.getOutputCol) + .setOutputCol("removed") + + val stemmer = new Stemmer() + .setInputCol(stwr.getOutputCol) + .setOutputCol("stemmed") + .setLanguage("English") + + val hashingTF = new HashingTF() + .setNumFeatures(10000) + .setInputCol(stwr.getOutputCol) + .setOutputCol("rawFeatures") + + val idf = new IDF() + .setInputCol(hashingTF.getOutputCol) + .setOutputCol("features") + + val labelIndexer = new StringIndexer() + .setInputCol("tLabel") + .setOutputCol("indexedLabel") + + val rideg = new GBTClassifier() + .setLabelCol("indexedLabel") + .setFeaturesCol("features") + .setPredictionCol("target") + .setMaxIter(20) + + val rfc = new RandomForestClassifier() + .setLabelCol("indexedLabel") + .setFeaturesCol("features") + .setPredictionCol("target") + .setNumTrees(15) + + val pipe = new Pipeline() + .setStages(Array(token, stwr, hashingTF, idf, labelIndexer, rideg)) + val model = pipe.fit(trainSet) + + var result = model.transform(testSet).select(col("id"), col("target").cast(DataTypes.IntegerType)) + + result = result.join(sampleSet, sampleSet.col("id").equalTo(result.col("id")), "right") + .select(sampleSet.col("id"), when(result.col("id").isNull, lit(0)).otherwise(col("target")).as("target")) + + result.write.option("header", "true").option("inferSchema", "true").csv("src/main/resources/sample_submission32.csv") + model.write.overwrite + .save(sys.env("HOME") + "/Документы/Технополис/ML/bigData2020/hw2/moreva/task2/model/") + } +} diff --git a/hw2/moreva/task2/src/main/scala/sample/HomeWork2.scala b/hw2/moreva/task2/src/main/scala/sample/HomeWork2.scala new file mode 100644 index 0000000..c33740d --- /dev/null +++ b/hw2/moreva/task2/src/main/scala/sample/HomeWork2.scala @@ -0,0 +1,48 @@ +package sample + +import org.apache.spark.ml.PipelineModel +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{DataTypes, StringType, StructType} + +object HomeWork2 { + def main(args: Array[String]): Unit = { + val spark = SparkSession.builder() + .appName("Task3") + .master("local[*]") + .getOrCreate() + + spark.sparkContext.setLogLevel("OFF") + + val scheme = new StructType().add("id", StringType, nullable = true).add("text", StringType, nullable = true) + + val inputData = spark.readStream + .format("socket") + .option("host", "localhost") + .option("port", 8065) + .load() + + val inputJson = + inputData.withColumn("json", from_json(col("value"), scheme)) + .select("json.*") + .select(col("id"), col("text")) + + + val model = PipelineModel.read.load("model/") + inputJson.printSchema() + val result = model.transform(inputJson.select(col("id"), col("text"))) + .select(col("id"), col("target").as("target").cast(DataTypes.IntegerType)) + + val query = result + .repartition(1) + .writeStream + .outputMode("append") + .format("com.databricks.spark.csv") + .option("header", "true") + .option("path", "path/") + .option("checkpointLocation", "checkpointLocation/") + .start() + .awaitTermination() + } + +}