Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions hw0/README.md

This file was deleted.

19 changes: 0 additions & 19 deletions hw1/README.md

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
id,target
10,0
100 changes: 100 additions & 0 deletions hw2/moreva/task2/src/main/scala/sample/HomeWork1.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package sample

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.{GBTClassifier, RandomForestClassifier}
import org.apache.spark.ml.feature._
import org.apache.spark.mllib.feature.Stemmer
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DataTypes

object HomeWork1 {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Task2")
.master("local[*]")
.getOrCreate()

spark.sparkContext.setLogLevel("OFF")

val testSet = spark
.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("src/main/resources/test.csv")
.filter(col("id").isNotNull)
.filter(col("text").isNotNull)
.select("id", "text")

val trainSet = spark
.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("src/main/resources/train.csv")
.filter(col("id").isNotNull)
.filter(col("target").isNotNull)
.filter(col("text").isNotNull)
.select("id", "text", "target")
.withColumnRenamed("target", "tLabel")

val sampleSet = spark
.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("src/main/resources/sample_submission.csv")
.select("id")

val token = new RegexTokenizer()
.setInputCol("text")
.setOutputCol("words")
.setPattern("[\\W]")

val stwr = new StopWordsRemover()
.setInputCol(token.getOutputCol)
.setOutputCol("removed")

val stemmer = new Stemmer()
.setInputCol(stwr.getOutputCol)
.setOutputCol("stemmed")
.setLanguage("English")

val hashingTF = new HashingTF()
.setNumFeatures(10000)
.setInputCol(stwr.getOutputCol)
.setOutputCol("rawFeatures")

val idf = new IDF()
.setInputCol(hashingTF.getOutputCol)
.setOutputCol("features")

val labelIndexer = new StringIndexer()
.setInputCol("tLabel")
.setOutputCol("indexedLabel")

val rideg = new GBTClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("features")
.setPredictionCol("target")
.setMaxIter(20)

val rfc = new RandomForestClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("features")
.setPredictionCol("target")
.setNumTrees(15)

val pipe = new Pipeline()
.setStages(Array(token, stwr, hashingTF, idf, labelIndexer, rideg))
val model = pipe.fit(trainSet)

var result = model.transform(testSet).select(col("id"), col("target").cast(DataTypes.IntegerType))

result = result.join(sampleSet, sampleSet.col("id").equalTo(result.col("id")), "right")
.select(sampleSet.col("id"), when(result.col("id").isNull, lit(0)).otherwise(col("target")).as("target"))

result.write.option("header", "true").option("inferSchema", "true").csv("src/main/resources/sample_submission32.csv")
model.write.overwrite
.save(sys.env("HOME") + "/Документы/Технополис/ML/bigData2020/hw2/moreva/task2/model/")
}
}
48 changes: 48 additions & 0 deletions hw2/moreva/task2/src/main/scala/sample/HomeWork2.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package sample

import org.apache.spark.ml.PipelineModel
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DataTypes, StringType, StructType}

object HomeWork2 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Task3")
.master("local[*]")
.getOrCreate()

spark.sparkContext.setLogLevel("OFF")

val scheme = new StructType().add("id", StringType, nullable = true).add("text", StringType, nullable = true)

val inputData = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 8065)
.load()

val inputJson =
inputData.withColumn("json", from_json(col("value"), scheme))
.select("json.*")
.select(col("id"), col("text"))


val model = PipelineModel.read.load("model/")
inputJson.printSchema()
val result = model.transform(inputJson.select(col("id"), col("text")))
.select(col("id"), col("target").as("target").cast(DataTypes.IntegerType))

val query = result
.repartition(1)
.writeStream
.outputMode("append")
.format("com.databricks.spark.csv")
.option("header", "true")
.option("path", "path/")
.option("checkpointLocation", "checkpointLocation/")
.start()
.awaitTermination()
}

}