diff --git a/streaming-twitter/build.sbt b/streaming-twitter/build.sbt index 8da1465d..32d75c64 100644 --- a/streaming-twitter/build.sbt +++ b/streaming-twitter/build.sbt @@ -2,28 +2,36 @@ name := "streaming-twitter" version := "1.6" -scalaVersion := "2.10.4" +scalaVersion := "2.11.4" libraryDependencies ++= { - val sparkVersion = "1.6.0" + val sparkVersion = "2.1.1" Seq( "org.apache.spark" %% "spark-core" % sparkVersion % "provided", "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided", - "org.apache.spark" %% "spark-streaming-twitter" % sparkVersion, + // "org.apache.spark" %% "spark-streaming-twitter" % sparkVersion, + "org.apache.bahir" %% "spark-streaming-twitter" % sparkVersion, "org.apache.spark" %% "spark-repl" % sparkVersion % "provided", - "com.ibm" %% "couchdb-scala" % "0.5.3", + "com.ibm" %% "couchdb-scala" % "0.5.1", "org.apache.kafka" % "kafka-log4j-appender" % "0.9.0.0", "org.apache.kafka" % "kafka-clients" % "0.9.0.0", "org.apache.kafka" %% "kafka" % "0.9.0.0", - "com.google.guava" % "guava" % "14.0.1" + "com.google.guava" % "guava" % "14.0.1", + "org.twitter4j" % "twitter4j-core" % "4.0.4", + "org.twitter4j" % "twitter4j-stream" % "4.0.4", + "org.json" % "json" % "20180130" ) } assemblyMergeStrategy in assembly := { - case PathList("org", "apache", "spark", xs @ _*) => MergeStrategy.first - case PathList("scala", xs @ _*) => MergeStrategy.discard - case PathList("com", "ibm", "pixiedust", xs @ _*) => MergeStrategy.discard + case PathList("org", "apache", "spark", xs @ _*) => MergeStrategy.first + case PathList("org", "apache", "hadoop", xs @ _*) => MergeStrategy.first + case PathList("com", "google", xs @ _*) => MergeStrategy.first + case PathList("org", "apache", xs @ _*) => MergeStrategy.first + case PathList("javax", "xml", xs @ _*) => MergeStrategy.first + case PathList("scala", xs @ _*) => MergeStrategy.discard + case PathList("com", "ibm", "pixiedust", xs @ _*) => MergeStrategy.discard case PathList("META-INF", "maven", "org.slf4j", xs @ _* ) => MergeStrategy.first case x => val oldStrategy = (assemblyMergeStrategy in assembly).value @@ -32,6 +40,7 @@ assemblyMergeStrategy in assembly := { unmanagedBase <<= baseDirectory { base => base / "lib" } +//Important line below. This strips out all the scala dependencies and shrinks down your jar into skinny jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false) resolvers += "scalaz-bintray" at "https://dl.bintray.com/scalaz/releases" diff --git a/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/KafkaProducerTest.scala b/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/KafkaProducerTest.scala index 46303385..ea531b86 100644 --- a/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/KafkaProducerTest.scala +++ b/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/KafkaProducerTest.scala @@ -36,20 +36,22 @@ import twitter4j.Status import twitter4j.StatusDeletionNotice import twitter4j.StatusListener import twitter4j.TwitterStreamFactory +import twitter4j.TwitterStream import scala.util.parsing.json.JSON import java.io.InputStream -import twitter4j.TwitterStream import com.ibm.cds.spark.samples.config.DemoConfig -import org.apache.spark.Logging +// import org.apache.spark.Logging +import org.apache.log4j.Logger /** * @author dtaieb */ -object KafkaProducerTest extends Logging{ +object KafkaProducerTest { //Very verbose, enable only if necessary //Logger.getLogger("org.apache.kafka").setLevel(Level.ALL) //Logger.getLogger("kafka").setLevel(Level.ALL) + val log = Logger.getLogger(getClass.getName) var twitterStream : TwitterStream = _; @@ -79,11 +81,13 @@ object KafkaProducerTest extends Logging{ def onStatus(status: Status){ if ( lastSent == 0 || System.currentTimeMillis() - lastSent > 200L){ lastSent = System.currentTimeMillis() - logInfo("Got a status " + status.getText ) + // logInfo("Got a status " + status.getText ) + log.info("Got a status " + status.getText ) val producerRecord = new ProducerRecord(kafkaProps.getConfig(MessageHubConfig.KAFKA_TOPIC_TWEETS ), "tweet", status ) try{ val metadata = kafkaProducer.send( producerRecord ).get(2000, TimeUnit.SECONDS); - logInfo("Successfully sent record: Topic: " + metadata.topic + " Offset: " + metadata.offset ) + // logInfo("Successfully sent record: Topic: " + metadata.topic + " Offset: " + metadata.offset ) + log.info("Successfully sent record: Topic: " + metadata.topic + " Offset: " + metadata.offset ) }catch{ case e:Throwable => e.printStackTrace } @@ -98,7 +102,8 @@ object KafkaProducerTest extends Logging{ def onException( e: Exception){ println("Unexpected error from twitterStream: " + e.getMessage); - logError(e.getMessage, e) + // logError(e.getMessage, e) + log.error(e.getMessage, e) } def onScrubGeo(lat: Long, long: Long ){ @@ -149,4 +154,4 @@ object KafkaConsumerTest { } }).start } -} \ No newline at end of file +} diff --git a/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/MessageHubStreamingTwitter.scala b/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/MessageHubStreamingTwitter.scala index 18ca53de..1dfa086e 100644 --- a/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/MessageHubStreamingTwitter.scala +++ b/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/MessageHubStreamingTwitter.scala @@ -50,14 +50,17 @@ import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted import org.apache.spark.broadcast.Broadcast -import org.apache.spark.Logging +// import org.apache.spark.Logging +import org.apache.log4j.Logger + import java.util.Arrays /** * @author dtaieb * Twitter+Watson sample app with MessageHub/Kafka */ -object MessageHubStreamingTwitter extends Logging{ +object MessageHubStreamingTwitter { + val log = Logger.getLogger(getClass.getName) var ssc: StreamingContext = null val reuseCheckpoint = false; @@ -70,7 +73,7 @@ object MessageHubStreamingTwitter extends Logging{ //Logger.getLogger("org.apache.kafka").setLevel(Level.ALL) //Logger.getLogger("kafka").setLevel(Level.ALL) - Logger.getLogger("org.apache.spark").setLevel(Level.WARN) + // Logger.getLogger("org.apache.spark").setLevel(Level.WARN) def main(args: Array[String]): Unit = { println("Printing arguments: "); @@ -151,10 +154,12 @@ object MessageHubStreamingTwitter extends Logging{ if ( task != null ){ val producerRecord = new ProducerRecord[String,String](task._1, "tweet", task._2 ) val metadata = kafkaProducer.send( producerRecord ).get; - logInfo("Sent record " + metadata.offset() + " Topic " + task._1) + // logInfo("Sent record " + metadata.offset() + " Topic " + task._1) + log.info("Sent record " + metadata.offset() + " Topic " + task._1) } }catch{ - case e:Throwable => logError(e.getMessage, e) + // case e:Throwable => logError(e.getMessage, e) + case e:Throwable => log.error(e.getMessage, e) } } queue.synchronized{ @@ -206,11 +211,11 @@ object MessageHubStreamingTwitter extends Logging{ val sentiment = ToneAnalyzer.computeSentiment( client, status, broadcastVar ) var scoreMap : Map[String, Double] = Map() if ( sentiment != null ){ - for( toneCategory <- Option(sentiment.tone_categories).getOrElse( Seq() )){ - for ( tone <- Option( toneCategory.tones ).getOrElse( Seq() ) ){ + // for( toneCategory <- Option(sentiment.tone_categories).getOrElse( Seq() )){ + for ( tone <- Option( sentiment.tones ).getOrElse( Seq() ) ){ scoreMap.put( tone.tone_id, (BigDecimal(tone.score).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble) * 100.0 ) } - } + //} } EnrichedTweet( @@ -240,7 +245,8 @@ object MessageHubStreamingTwitter extends Logging{ try{ queue.notify }catch{ - case e:Throwable=>logError(e.getMessage, e) + // case e:Throwable=>logError(e.getMessage, e) + case e:Throwable=>log.error(e.getMessage, e) } } }) @@ -302,7 +308,8 @@ object MessageHubStreamingTwitter extends Logging{ try{ queue.notify }catch{ - case e:Throwable=>logError(e.getMessage, e) + // case e:Throwable=>logError(e.getMessage, e) + case e:Throwable=>log.error(e.getMessage, e) } } } @@ -322,7 +329,8 @@ object MessageHubStreamingTwitter extends Logging{ } } -object TweetsMetricJsonSerializer extends Logging{ +object TweetsMetricJsonSerializer { + val log = Logger.getLogger(getClass.getName) def serialize(value: Seq[(String,Long)] ): String = { val sb = new StringBuilder("[") var comma = "" @@ -331,12 +339,14 @@ object TweetsMetricJsonSerializer extends Logging{ comma="," }) sb.append("]") - logInfo("Serialized json: " + sb) + // logInfo("Serialized json: " + sb) + log.info("Serialized json: " + sb) sb.toString() } } -object ToneScoreJsonSerializer extends Logging{ +object ToneScoreJsonSerializer { + val log = Logger.getLogger(getClass.getName) def serializeList[U:ClassTag]( label: String, value: List[U] ):String = { val sb = new StringBuilder("[\"" + label.replaceAll("\"", "") + "\"") value.foreach { item => { @@ -364,7 +374,8 @@ object ToneScoreJsonSerializer extends Logging{ comma="," }) sb.append("]") - logInfo("Serialized size: " + value.size + ". Tone json: " + sb) + // logInfo("Serialized size: " + value.size + ". Tone json: " + sb) + log.info("Serialized size: " + value.size + ". Tone json: " + sb) sb.toString() } -} \ No newline at end of file +} diff --git a/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/PixiedustStreamingTwitter.scala b/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/PixiedustStreamingTwitter.scala index 7f3fdcb2..7a87df10 100644 --- a/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/PixiedustStreamingTwitter.scala +++ b/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/PixiedustStreamingTwitter.scala @@ -19,7 +19,7 @@ package com.ibm.cds.spark.samples import scala.collection.mutable._ import com.ibm.pixiedust.ChannelReceiver -import org.apache.spark.Logging +// import org.apache.spark.Logging import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.SparkContext @@ -49,7 +49,8 @@ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.broadcast.Broadcast import org.apache.spark.HashPartitioner import twitter4j.Status -import org.codehaus.jettison.json.JSONObject +// import org.codehaus.jettison.json.JSONObject +import org.json.JSONObject import org.apache.spark.AccumulableParam import org.apache.spark.streaming.StreamingContextState import org.apache.spark.sql.DataFrame @@ -57,7 +58,7 @@ import org.apache.spark.sql.DataFrame /* @author dtaieb * Twitter+Watson sentiment analysis app powered by Pixiedust */ -object PixiedustStreamingTwitter extends ChannelReceiver() with Logging{ +object PixiedustStreamingTwitter extends ChannelReceiver() { var ssc: StreamingContext = null var workingRDD: RDD[Row] = null //Hold configuration key/value pairs @@ -222,11 +223,11 @@ object PixiedustStreamingTwitter extends ChannelReceiver() with Logging{ val sentiment = ToneAnalyzer.computeSentiment( client, status, broadcastVar ) var scoreMap : Map[String, Double] = Map() if ( sentiment != null ){ - for( toneCategory <- Option(sentiment.tone_categories).getOrElse( Seq() )){ - for ( tone <- Option( toneCategory.tones ).getOrElse( Seq() ) ){ + // for( toneCategory <- Option(sentiment.tone_categories).getOrElse( Seq() )){ + for ( tone <- Option( sentiment.tones ).getOrElse( Seq() ) ){ scoreMap.put( tone.tone_id, (BigDecimal(tone.score).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble) * 100.0 ) } - } + // } } var jsonSentiment="{"; @@ -342,4 +343,4 @@ object TweetsAccumulatorParam extends AccumulableParam[Array[(String,String)], ( def addAccumulator(current:Array[(String,String)], s:(String,String)):Array[(String,String)] = { current :+ s } -} \ No newline at end of file +} diff --git a/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/StreamingTwitter.scala b/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/StreamingTwitter.scala index 7410f991..8ae62957 100644 --- a/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/StreamingTwitter.scala +++ b/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/StreamingTwitter.scala @@ -46,7 +46,7 @@ import com.google.common.base.CharMatcher import scala.math.BigDecimal import com.ibm.cds.spark.samples.config.DemoConfig import com.ibm.cds.spark.samples.ToneAnalyzer.ToneCategory -import org.apache.spark.Logging +// import org.apache.spark.Logging @@ -54,7 +54,8 @@ import org.apache.spark.Logging /** * @author dtaieb */ -object StreamingTwitter extends Logging{ +object StreamingTwitter { + val log = Logger.getLogger(getClass.getName) var ssc: StreamingContext = null var sqlContext: SQLContext = null var workingRDD: RDD[Row] = null @@ -152,11 +153,11 @@ object StreamingTwitter extends Logging{ var scoreMap : Map[String, Double] = Map() if ( sentiment != null ){ - for( toneCategory <- Option(sentiment.tone_categories).getOrElse( Seq() )){ - for ( tone <- Option( toneCategory.tones ).getOrElse( Seq() ) ){ + // for( toneCategory <- Option(sentiment.tone_categories).getOrElse( Seq() )){ + for ( tone <- Option( sentiment.tones ).getOrElse( Seq() ) ){ scoreMap.put( tone.tone_id, tone.score ) } - } + // } } colValues = colValues ++ ToneAnalyzer.sentimentFactors.map { f => (BigDecimal(scoreMap.get(f._2).getOrElse(0.0)).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble) * 100.0 } @@ -199,14 +200,16 @@ object StreamingTwitter extends Logging{ } }catch{ case e: InterruptedException=>//Ignore - case e: Exception => logError(e.getMessage, e ) + // case e: Exception => logError(e.getMessage, e ) + case e: Exception => log.error(e.getMessage, e ) }finally{ canStopTwitterStream = true } }) }catch{ - case e : Exception => logError(e.getMessage, e ) + // case e : Exception => logError(e.getMessage, e ) + case e : Exception => log.error(e.getMessage, e ) return } ssc.start() @@ -239,7 +242,7 @@ object StreamingTwitter extends Logging{ } } - def saveTweetToCloudant(client: Client, db: CouchDbApi, status:Status, sentiment: ToneAnalyzer.Sentiment) : Status = { + def saveTweetToCloudant(client: Client, db: CouchDbApi, status:Status, sentiment: ToneAnalyzer.ToneCategory) : Status = { if ( db != null){ logger.trace("Creating new Tweet in Couch Database " + status.getText()) val task:Task[Res.DocOk] = db.docs.create( @@ -258,7 +261,8 @@ object StreamingTwitter extends Logging{ // Execute the actions and process the result task.attemptRun match { - case -\/(e) => logError(e.getMessage, e ); + // case -\/(e) => logError(e.getMessage, e ); + case -\/(e) => log.error(e.getMessage, e ); case \/-(a) => logger.trace("Successfully create new Tweet in Couch Database " + status.getText() ) } } @@ -282,7 +286,8 @@ object StreamingTwitter extends Logging{ (sqlContext, df) }catch{ - case e: Exception => {logError(e.getMessage, e ); return null} + // case e: Exception => {logError(e.getMessage, e ); return null} + case e: Exception => {log.error(e.getMessage, e ); return null} } } @@ -302,4 +307,4 @@ object StreamingTwitter extends Logging{ println("df.printSchema") println("sqlContext.sql(\"select author, text from tweets\").show") } -} \ No newline at end of file +} diff --git a/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/ToneAnalyzer.scala b/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/ToneAnalyzer.scala index 63504487..ad43f3b7 100644 --- a/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/ToneAnalyzer.scala +++ b/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/ToneAnalyzer.scala @@ -11,15 +11,19 @@ import org.http4s.Method import org.http4s.headers.Authorization import org.apache.log4j.Logger import org.apache.spark.broadcast.Broadcast -import org.apache.spark.Logging -import scala.util.parsing.json.JSON -import org.codehaus.jettison.json.JSONObject +// import org.apache.spark.Logging +import org.apache.log4j.Logger +// import scala.util.parsing.json.JSON +// import org.codehaus.jettison.json.JSONObject +import org.json.JSONObject +import scala.util.parsing.json._ /** * @author dtaieb */ -object ToneAnalyzer extends Logging{ +object ToneAnalyzer { + val log = Logger.getLogger(getClass.getName) val sentimentFactors = Array( ("Anger","anger"), @@ -38,9 +42,11 @@ object ToneAnalyzer extends Logging{ ) //Class models for Sentiment JSON - case class DocumentTone( document_tone: Sentiment ) - case class Sentiment(tone_categories: Seq[ToneCategory]); - case class ToneCategory(category_id: String, category_name: String, tones: Seq[Tone]); + // case class DocumentTone( document_tone: Sentiment ) + // case class Sentiment(tone_categories: Seq[ToneCategory]); + // case class ToneCategory(category_id: String, category_name: String, tones: Seq[Tone]); + case class DocumentTone(document_tone: ToneCategory) + case class ToneCategory(tones: Seq[Tone]); case class Tone(score: Double, tone_id: String, tone_name: String) // case class Sentiment( scorecard: String, children: Seq[Tone] ) // case class Tone( name: String, id: String, children: Seq[ToneResult]) @@ -48,10 +54,11 @@ object ToneAnalyzer extends Logging{ // case class LinguisticEvidence( evidence_score: Double, word_count: Double, correlation: String, words : Seq[String]) case class Geo( lat: Double, long: Double ) - case class Tweet(author: String, date: String, language: String, text: String, geo : Geo, sentiment : Sentiment ) + case class Tweet(author: String, date: String, language: String, text: String, geo : Geo, sentiment : ToneCategory) - def computeSentiment( client: Client, status:StatusAdapter, broadcastVar: Broadcast[Map[String,String]] ) : Sentiment = { - logTrace("Calling sentiment from Watson Tone Analyzer: " + status.text) + def computeSentiment( client: Client, status:StatusAdapter, broadcastVar: Broadcast[Map[String,String]] ) : ToneCategory = { + // logTrace("Calling sentiment from Watson Tone Analyzer: " + status.text) + log.trace("Calling sentiment from Watson Tone Analyzer: " + status.text) try{ //Get Sentiment on the tweet val sentimentResults: String = @@ -90,4 +97,4 @@ object ToneAnalyzer extends Logging{ } } } -} \ No newline at end of file +}