From c1380101cd1795ca4a6e1d700dc83387f4b45772 Mon Sep 17 00:00:00 2001 From: Thomas Hornung Date: Fri, 16 Mar 2018 10:33:10 +0100 Subject: [PATCH 1/2] adapted code to new return values object of tone analyzer, updated to current dsx scala and spark version, modified old Logging class to use only log4j, JSONObject now in a different package --- streaming-twitter/KafkaProducerTest.scala | 157 ++++++++ .../MessageHubStreamingTwitter.scala | 381 ++++++++++++++++++ .../PixiedustStreamingTwitter.scala | 346 ++++++++++++++++ streaming-twitter/StatusSerializer.scala | 44 ++ streaming-twitter/StreamingListener.scala | 50 +++ streaming-twitter/StreamingTwitter.scala | 310 ++++++++++++++ streaming-twitter/ToneAnalyzer.scala | 100 +++++ streaming-twitter/TwitterAdapter.scala | 87 ++++ streaming-twitter/build.sbt | 25 +- 9 files changed, 1492 insertions(+), 8 deletions(-) create mode 100644 streaming-twitter/KafkaProducerTest.scala create mode 100644 streaming-twitter/MessageHubStreamingTwitter.scala create mode 100644 streaming-twitter/PixiedustStreamingTwitter.scala create mode 100644 streaming-twitter/StatusSerializer.scala create mode 100644 streaming-twitter/StreamingListener.scala create mode 100644 streaming-twitter/StreamingTwitter.scala create mode 100644 streaming-twitter/ToneAnalyzer.scala create mode 100644 streaming-twitter/TwitterAdapter.scala diff --git a/streaming-twitter/KafkaProducerTest.scala b/streaming-twitter/KafkaProducerTest.scala new file mode 100644 index 00000000..ea531b86 --- /dev/null +++ b/streaming-twitter/KafkaProducerTest.scala @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.ibm.cds.spark.samples + +import java.io.ByteArrayInputStream +import java.io.ByteArrayOutputStream +import java.io.ObjectInputStream +import java.io.ObjectOutputStream +import java.util.concurrent.TimeUnit +import scala.collection.JavaConversions.mapAsJavaMap +import scala.collection.JavaConversions.seqAsJavaList +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.serialization.Deserializer +import org.apache.kafka.common.serialization.Serializer +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.log4j.Level +import org.apache.log4j.Logger +import com.ibm.cds.spark.samples.config.MessageHubConfig +import twitter4j.StallWarning +import twitter4j.Status +import twitter4j.StatusDeletionNotice +import twitter4j.StatusListener +import twitter4j.TwitterStreamFactory +import twitter4j.TwitterStream +import scala.util.parsing.json.JSON +import java.io.InputStream +import com.ibm.cds.spark.samples.config.DemoConfig +// import org.apache.spark.Logging +import org.apache.log4j.Logger + + +/** + * @author dtaieb + */ +object KafkaProducerTest { + //Very verbose, enable only if necessary + //Logger.getLogger("org.apache.kafka").setLevel(Level.ALL) + //Logger.getLogger("kafka").setLevel(Level.ALL) + val log = Logger.getLogger(getClass.getName) + + var twitterStream : TwitterStream = _; + + def main(args: Array[String]): Unit = { + createTwitterStream(); + } + + def createTwitterStream(props: DemoConfig=null):TwitterStream = { + if( twitterStream != null){ + println("Twitter Stream already running. Please call closeTwitterStream first"); + return twitterStream; + } + var kafkaProps:MessageHubConfig = null; + if ( props == null ){ + kafkaProps = new MessageHubConfig + }else{ + kafkaProps = props.cloneConfig + } + kafkaProps.setValueSerializer[StatusSerializer] + kafkaProps.validateConfiguration("watson.tone.") + kafkaProps.createTopicsIfNecessary( kafkaProps.getConfig(MessageHubConfig.KAFKA_TOPIC_TWEETS ) ) + val kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer[java.lang.String, Status]( kafkaProps.toImmutableMap() ); + + twitterStream = new TwitterStreamFactory().getInstance(); + twitterStream.addListener( new StatusListener(){ + var lastSent:Long = 0; + def onStatus(status: Status){ + if ( lastSent == 0 || System.currentTimeMillis() - lastSent > 200L){ + lastSent = System.currentTimeMillis() + // logInfo("Got a status " + status.getText ) + log.info("Got a status " + status.getText ) + val producerRecord = new ProducerRecord(kafkaProps.getConfig(MessageHubConfig.KAFKA_TOPIC_TWEETS ), "tweet", status ) + try{ + val metadata = kafkaProducer.send( producerRecord ).get(2000, TimeUnit.SECONDS); + // logInfo("Successfully sent record: Topic: " + metadata.topic + " Offset: " + metadata.offset ) + log.info("Successfully sent record: Topic: " + metadata.topic + " Offset: " + metadata.offset ) + }catch{ + case e:Throwable => e.printStackTrace + } + } + } + def onDeletionNotice( notice: StatusDeletionNotice){ + + } + def onTrackLimitationNotice( numLimitation : Int){ + println("Received track limitation notice from Twitter: " + numLimitation) + } + + def onException( e: Exception){ + println("Unexpected error from twitterStream: " + e.getMessage); + // logError(e.getMessage, e) + log.error(e.getMessage, e) + } + + def onScrubGeo(lat: Long, long: Long ){ + + } + + def onStallWarning(warning: StallWarning ){ + + } + }) + + //Start twitter stream sampling + twitterStream.sample(); + + println("Twitter stream started. Tweets will flow to MessageHub instance. Please call closeTwitterStream to stop the stream") + twitterStream + } + + def closeTwitterStream(){ + if ( twitterStream==null){ + println("Nothing to close. Twitter stream has not been started") + }else{ + println("Stopping twitter stream"); + twitterStream.shutdown() + twitterStream=null + println("Twitter Stream stopped") + } + } +} + +object KafkaConsumerTest { + def main(args: Array[String]): Unit = { + val kafkaProps = new MessageHubConfig + kafkaProps.validateConfiguration("watson.tone.") + val kafkaConsumer = new KafkaConsumer[java.lang.String, StatusAdapter](kafkaProps.toImmutableMap, new StringDeserializer(), new StatusDeserializer()) + + kafkaConsumer.subscribe( List(kafkaProps.getConfig(MessageHubConfig.KAFKA_TOPIC_TWEETS )) ) + new Thread( new Runnable { + def run(){ + while( true ){ + Thread.sleep( 1000L ) + val it = kafkaConsumer.poll(1000L).iterator + while( it.hasNext() ){ + val record = it.next(); + println( record.value ); + } + } + } + }).start + } +} diff --git a/streaming-twitter/MessageHubStreamingTwitter.scala b/streaming-twitter/MessageHubStreamingTwitter.scala new file mode 100644 index 00000000..1dfa086e --- /dev/null +++ b/streaming-twitter/MessageHubStreamingTwitter.scala @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.cds.spark.samples + +import scala.BigDecimal +import scala.collection.JavaConversions.mapAsJavaMap +import scala.collection.immutable.Seq.canBuildFrom +import scala.collection.mutable.ListBuffer +import scala.collection.mutable.Map +import scala.reflect.ClassTag +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer +import org.apache.spark.HashPartitioner +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Row +import org.apache.spark.streaming.Duration +import org.apache.spark.streaming.Seconds +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions +import org.http4s.client.blaze.PooledHttp1Client +import com.google.common.base.CharMatcher +import com.ibm.cds.spark.samples.config.MessageHubConfig +import com.ibm.cds.spark.samples.dstream.KafkaStreaming.KafkaStreamingContextAdapter +import twitter4j.Status +import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted +import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted +import com.ibm.cds.spark.samples.config.DemoConfig +import org.apache.log4j.Level +import org.apache.log4j.Logger +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped +import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError +import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted +import org.apache.spark.broadcast.Broadcast +// import org.apache.spark.Logging +import org.apache.log4j.Logger + +import java.util.Arrays + +/** + * @author dtaieb + * Twitter+Watson sample app with MessageHub/Kafka + */ +object MessageHubStreamingTwitter { + val log = Logger.getLogger(getClass.getName) + + var ssc: StreamingContext = null + val reuseCheckpoint = false; + + val queue = new scala.collection.mutable.Queue[(String, String)] + + final val KAFKA_TOPIC_TOP_HASHTAGS = "topHashTags" + final val KAFKA_TOPIC_TONE_SCORES = "topHashTags.toneScores" + final val KAFKA_TOPIC_TOTAL_TWEETS_PROCESSED = "total_tweets" + + //Logger.getLogger("org.apache.kafka").setLevel(Level.ALL) + //Logger.getLogger("kafka").setLevel(Level.ALL) + // Logger.getLogger("org.apache.spark").setLevel(Level.WARN) + + def main(args: Array[String]): Unit = { + println("Printing arguments: "); + args.foreach { println } + + if(args.length>0 && System.getProperty("DEMO_CONFIG_PATH") == null ){ + //On Spark Service, input files are passed as parameters, if available, we assume first parameter is config file + System.setProperty("DEMO_CONFIG_PATH", args(0)) + } + val conf = new SparkConf().setAppName("Spark Streaming Twitter + Watson with MessageHub/Kafka Demo") + val sc = new SparkContext(conf) + startTwitterStreaming(sc); + + if(ssc!=null){ + //When running as stand alone app, we call awaitTermination to make sure the JVM doesn't exit prematurely due to the fact + //that all non-daemon threads have terminated. Note: Don't call awaitTermination directly from startTwitterStreaming as it can be run + //From Notebook + ssc.awaitTermination() + } + } + + //Hold configuration key/value pairs + lazy val kafkaProps = new MessageHubConfig + + //Wrapper api for Notebook access + def getConfig():DemoConfig={ + kafkaProps + } + + def startTwitterStreaming( sc: SparkContext, stopAfter: Duration = Seconds(0) ){ + if ( ssc != null ){ + println("Twitter Stream already running"); + return; + } + + kafkaProps.setValueSerializer[StringSerializer]; + + if ( !kafkaProps.validateConfiguration("twitter4j.oauth") ){ + return; + } + + //Set the hadoop configuration if needed + val checkpointDir = kafkaProps.getConfig( MessageHubConfig.CHECKPOINT_DIR_KEY ); + if ( checkpointDir.startsWith("swift") ){ + println("Setting hadoop configuration for swift container") + kafkaProps.set_hadoop_config(sc) + } + + //Make sure the topics are already created + kafkaProps.createTopicsIfNecessary( KAFKA_TOPIC_TONE_SCORES, KAFKA_TOPIC_TOP_HASHTAGS, KAFKA_TOPIC_TOTAL_TWEETS_PROCESSED ) + + val kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer[String, String]( kafkaProps.toImmutableMap ); + + if ( !reuseCheckpoint ){ + createStreamingContextAndRunAnalytics(sc); + }else{ + ssc = StreamingContext.getOrCreate( + kafkaProps.getConfig( MessageHubConfig.CHECKPOINT_DIR_KEY ), + () => { + createStreamingContextAndRunAnalytics(sc); + }, + sc.hadoopConfiguration, + true + ); + } + + ssc.addStreamingListener( new StreamingListener() ) + + new Thread( new Runnable() { + def run(){ + while(ssc!=null){ + while(!queue.isEmpty ){ + try{ + var task:(String,String) = null; + queue.synchronized{ + task = queue.dequeue(); + } + if ( task != null ){ + val producerRecord = new ProducerRecord[String,String](task._1, "tweet", task._2 ) + val metadata = kafkaProducer.send( producerRecord ).get; + // logInfo("Sent record " + metadata.offset() + " Topic " + task._1) + log.info("Sent record " + metadata.offset() + " Topic " + task._1) + } + }catch{ + // case e:Throwable => logError(e.getMessage, e) + case e:Throwable => log.error(e.getMessage, e) + } + } + queue.synchronized{ + queue.wait(); + } + } + } + },"Message Hub producer").start + + ssc.start + + println("Twitter stream started"); + println("Tweets are collected real-time and analyzed") + println("To stop the streaming and start interacting with the data use: StreamingTwitter.stopTwitterStreaming") + + if ( !stopAfter.isZero ){ + //Automatically stop it after 10s + new Thread( new Runnable { + def run(){ + Thread.sleep( stopAfter.milliseconds ) + stopTwitterStreaming + } + }).start + } + } + + def createStreamingContextAndRunAnalytics(sc:SparkContext):StreamingContext={ + //Broadcast the config to each worker node + val broadcastVar = sc.broadcast( kafkaProps.toImmutableMap ) + ssc = new StreamingContext( sc, Seconds(5) ) + ssc.checkpoint(kafkaProps.getConfig( MessageHubConfig.CHECKPOINT_DIR_KEY )); + val stream = ssc.createKafkaStream[String, StatusAdapter,StringDeserializer, StatusDeserializer]( + kafkaProps, + List(kafkaProps.getConfig(MessageHubConfig.KAFKA_TOPIC_TWEETS )) + ); + runAnalytics(sc, broadcastVar, stream) + ssc; + } + + def runAnalytics(sc:SparkContext, broadcastVar: Broadcast[scala.collection.immutable.Map[String,String]], stream:DStream[(String,StatusAdapter)]){ + val keys = broadcastVar.value.get("tweets.key").get.split(","); + val tweets = stream.map( t => t._2) + .filter { status => + status.userLang.startsWith("en") && CharMatcher.ASCII.matchesAllOf(status.text) && ( keys.isEmpty || keys.exists{status.text.contains(_)}) + } + + val rowTweets = tweets.map(status=> { + lazy val client = PooledHttp1Client() + val sentiment = ToneAnalyzer.computeSentiment( client, status, broadcastVar ) + var scoreMap : Map[String, Double] = Map() + if ( sentiment != null ){ + // for( toneCategory <- Option(sentiment.tone_categories).getOrElse( Seq() )){ + for ( tone <- Option( sentiment.tones ).getOrElse( Seq() ) ){ + scoreMap.put( tone.tone_id, (BigDecimal(tone.score).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble) * 100.0 ) + } + //} + } + + EnrichedTweet( + status.userName, + status.userId, + status.createdAt, + status.userLang, + status.text, + status.long, + status.lat, + scoreMap + ) + }) + + val delimTagTone = "-%!" + val delimToneScore = ":%@" + val statsStream = rowTweets.map { eTweet => ("total_tweets", 1L) } + .reduceByKey( _+_ ) + .updateStateByKey( (a:Seq[Long], b:Option[Long] ) => { + var runningCount=b.getOrElse(0L) + a.foreach { v => runningCount=runningCount+v } + Some(runningCount) + }) + statsStream.foreachRDD( rdd =>{ + queue.synchronized{ + queue+=((KAFKA_TOPIC_TOTAL_TWEETS_PROCESSED, TweetsMetricJsonSerializer.serialize(rdd.collect()))) + try{ + queue.notify + }catch{ + // case e:Throwable=>logError(e.getMessage, e) + case e:Throwable=>log.error(e.getMessage, e) + } + } + }) + + val metricsStream = rowTweets.flatMap { eTweet => { + val retList = ListBuffer[String]() + for ( tag <- eTweet.text.split("\\s+") ){ + if ( tag.startsWith( "#") && tag.length > 1 ){ + for ( tone <- Option( eTweet.sentimentScores.keys ).getOrElse( Seq() ) ){ + retList += (tag + delimTagTone + tone + delimToneScore + eTweet.sentimentScores.getOrElse( tone, 0.0)) + } + } + } + retList.toList + }} + .map { fullTag => { + val split = fullTag.split(delimToneScore); + (split(0), split(1).toFloat) + }} + .combineByKey( + (x:Float) => (x,1), + (x:(Float,Int), y:Float) => (x._1 + y, x._2+1), + (x:(Float,Int),y:(Float,Int)) => (x._1 + y._1, x._2 + y._2), + new HashPartitioner(sc.defaultParallelism) + ) + .map[(String,(Long/*count*/, List[(String, Double)]))]{ t => { + val key = t._1; + val ab = t._2; + val split = key.split(delimTagTone) + (split(0), (ab._2, List((split(1), BigDecimal(ab._1/ab._2).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble )))) + }} + .reduceByKey( (t,u) => (t._1+u._1, (t._2 ::: u._2).sortWith( (l,r) => l._1.compareTo( r._1 ) < 0 ))) + .mapValues( (item:(Long, List[(String,Double)])) => { + val unzip = item._2.unzip + (item._1/(item._2.size), unzip._1, unzip._2) + }) + .updateStateByKey( (a:scala.collection.Seq[(Long, List[String], List[Double])], b: Option[(Long, List[String], List[Double])]) => { + val safeB = b.getOrElse( (0L, List(), List() ) ) + var listTones = safeB._2 + var listScores = safeB._3 + var count = safeB._1 + for( item <- a ){ + count += item._1 + listScores = listScores.zipAll( item._3, 0.0, 0.0).map{ case(a,b)=>(a+b)/2 }.toList + listTones = item._2 + } + + Some( (count, listTones, listScores) ) + }) + + metricsStream.print + + metricsStream.foreachRDD( rdd =>{ + val topHashTags = rdd.sortBy( f => f._2._1, false ).take(5) + if ( !topHashTags.isEmpty){ + queue.synchronized{ + queue += ((KAFKA_TOPIC_TOP_HASHTAGS, TweetsMetricJsonSerializer.serialize(topHashTags.map( f => (f._1, f._2._1 ))))) + queue += ((KAFKA_TOPIC_TONE_SCORES, ToneScoreJsonSerializer.serialize(topHashTags))) + try{ + queue.notify + }catch{ + // case e:Throwable=>logError(e.getMessage, e) + case e:Throwable=>log.error(e.getMessage, e) + } + } + } + }) + } + + def stopTwitterStreaming(){ + if ( ssc == null){ + println("No Twitter stream to stop"); + return; + } + + println("Stopping Twitter stream. Please wait this may take a while") + ssc.stop(stopSparkContext = false, stopGracefully = true) + ssc = null + println("Twitter stream stopped"); + } +} + +object TweetsMetricJsonSerializer { + val log = Logger.getLogger(getClass.getName) + def serialize(value: Seq[(String,Long)] ): String = { + val sb = new StringBuilder("[") + var comma = "" + value.foreach( item => { + sb.append( comma + "[\"" + item._1.replaceAll("\"", "") + "\"," + item._2 + "]") + comma="," + }) + sb.append("]") + // logInfo("Serialized json: " + sb) + log.info("Serialized json: " + sb) + sb.toString() + } +} + +object ToneScoreJsonSerializer { + val log = Logger.getLogger(getClass.getName) + def serializeList[U:ClassTag]( label: String, value: List[U] ):String = { + val sb = new StringBuilder("[\"" + label.replaceAll("\"", "") + "\"") + value.foreach { item => { + if ( item.isInstanceOf[String] ) { + val s = ",\"" + item.toString().replaceAll("\"", "") + "\""; + sb.append( s.replaceAll("\"\"", "\"") ) + }else if ( item.isInstanceOf[Double] ){ + sb.append("," + item ) + } + }} + sb.append("]") + sb.toString + } + def serialize(value:Seq[(String, (Long, List[String], List[Double]))]):String={ + val sb = new StringBuilder("[") + var comma = "" + var appendToneData = true; + value.foreach( item => { + if ( appendToneData ){ + sb.append( comma + serializeList( "x", item._2._2 ) ) + appendToneData = false + comma = "," + } + sb.append( comma + serializeList( item._1, item._2._3 ) ) + comma="," + }) + sb.append("]") + // logInfo("Serialized size: " + value.size + ". Tone json: " + sb) + log.info("Serialized size: " + value.size + ". Tone json: " + sb) + sb.toString() + } +} diff --git a/streaming-twitter/PixiedustStreamingTwitter.scala b/streaming-twitter/PixiedustStreamingTwitter.scala new file mode 100644 index 00000000..7a87df10 --- /dev/null +++ b/streaming-twitter/PixiedustStreamingTwitter.scala @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.cds.spark.samples + +import scala.collection.mutable._ +import com.ibm.pixiedust.ChannelReceiver +// import org.apache.spark.Logging +import org.apache.log4j.Logger +import org.apache.log4j.Level +import org.apache.spark.SparkContext +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.Row +import com.ibm.cds.spark.samples.config.DemoConfig +import org.apache.spark.streaming.Seconds +import org.apache.spark.sql.types.IntegerType +import org.apache.spark.sql.types.DoubleType +import org.http4s.client.blaze.PooledHttp1Client +import org.apache.spark.sql.types.StructField +import org.apache.spark.sql.types.StringType +import com.google.common.base.CharMatcher +import com.ibm.couchdb.CouchDb +import com.ibm.couchdb.TypeMapping +import com.ibm.couchdb.CouchDbApi +import org.apache.spark.sql.SQLContext +import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError +import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped +import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted +import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted +import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted +import org.apache.spark.SparkConf +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.HashPartitioner +import twitter4j.Status +// import org.codehaus.jettison.json.JSONObject +import org.json.JSONObject +import org.apache.spark.AccumulableParam +import org.apache.spark.streaming.StreamingContextState +import org.apache.spark.sql.DataFrame + +/* @author dtaieb + * Twitter+Watson sentiment analysis app powered by Pixiedust + */ +object PixiedustStreamingTwitter extends ChannelReceiver() { + var ssc: StreamingContext = null + var workingRDD: RDD[Row] = null + //Hold configuration key/value pairs + lazy val config = new DemoConfig + lazy val logger: Logger = Logger.getLogger( "com.ibm.cds.spark.samples.PixiedustStreamingTwitter" ) + + val BEGINSTREAM = "@BEGINSTREAM@" + val ENDSTREAM = "@ENDSTREAM@" + + def sendLog(s:String){ + send("log", s) + } + + //Wrapper api for Notebook access + def setConfig(key:String, value:String){ + config.setConfig(key, value) + } + + //main method invoked when running as a standalone Spark Application + def main(args: Array[String]) { + val conf = new SparkConf().setAppName("Pixiedust Spark Streaming Twitter Demo") + val sc = new SparkContext(conf) + startStreaming(); + } + + def createTwitterDataFrames(sqlContext: SQLContext) : DataFrame = { + if ( workingRDD == null || workingRDD.count <= 0 ){ + println("No data receive. Please start the Twitter stream again to collect data") + return null + } + + sqlContext.createDataFrame( workingRDD, schemaTweets ) + } + + class PixiedustStreamingListener extends org.apache.spark.streaming.scheduler.StreamingListener { + override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { + sendLog("Receiver Started: " + receiverStarted.receiverInfo.name ) + //Signal the frontend that we started streaming + sendLog(BEGINSTREAM) + } + + override def onReceiverError(receiverError: StreamingListenerReceiverError) { + sendLog("Receiver Error: " + receiverError.receiverInfo.lastError) + } + + override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { + sendLog("Receiver Stopped: " + receiverStopped.receiverInfo.name) + sendLog("Reason: " + receiverStopped.receiverInfo.lastError + " : " + receiverStopped.receiverInfo.lastErrorMessage) + //signal the front end that we're done streaming + sendLog(ENDSTREAM) + } + + override def onBatchStarted(batchStarted: StreamingListenerBatchStarted){ + sendLog("Batch started with " + batchStarted.batchInfo.numRecords + " records") + } + + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted){ + sendLog("Batch completed with " + batchCompleted.batchInfo.numRecords + " records"); + } + } + + val reuseCheckpoint = false; + + def startStreaming(){ + val sc = SparkContext.getOrCreate + sendLog("Starting twitter stream"); + if ( ssc != null ){ + sendLog("Twitter Stream already running"); + sendLog("Please use stopTwitterStreaming() first and try again"); + return; + } + + if ( !config.validateConfiguration() ){ + sendLog("Unable to validate config") + sendLog(ENDSTREAM) + return; + } + + Logger.getLogger("org.apache.spark").setLevel(Level.OFF) + + //Set the hadoop configuration if needed + val checkpointDir = config.getConfig( DemoConfig.CHECKPOINT_DIR_KEY ); + if ( checkpointDir.startsWith("swift") ){ + println("Setting hadoop configuration for swift container") + config.set_hadoop_config(sc) + } + + workingRDD = sc.emptyRDD + + if ( !reuseCheckpoint ){ + ssc = createStreamingContextAndRunAnalytics(sc); + }else{ + ssc = StreamingContext.getOrCreate( + config.getConfig( DemoConfig.CHECKPOINT_DIR_KEY ), + () => { + createStreamingContextAndRunAnalytics(sc); + }, + sc.hadoopConfiguration, + true + ); + } + + ssc.addStreamingListener( new PixiedustStreamingListener ) + + ssc.start() + + sendLog("Twitter stream started"); + } + + def stopStreaming(){ + if ( ssc == null){ + sendLog("No Twitter stream to stop"); + return; + } + + sendLog("Stopping Twitter stream. Please wait this may take a while") + ssc.stop(stopSparkContext = false, stopGracefully = false) + ssc = null + sendLog("Twitter stream stopped"); + } + + def createStreamingContextAndRunAnalytics(sc:SparkContext):StreamingContext={ + //Broadcast the config to each worker node + val broadcastVar = sc.broadcast( config.toImmutableMap ) + ssc = new StreamingContext( sc, Seconds(5) ) + ssc.checkpoint(config.getConfig( DemoConfig.CHECKPOINT_DIR_KEY )); + val stream = org.apache.spark.streaming.twitter.TwitterUtils.createStream( ssc, None ); + runAnalytics(sc, broadcastVar, stream) + ssc; + } + + def runAnalytics(sc:SparkContext, broadcastVar: Broadcast[scala.collection.immutable.Map[String,String]], stream:DStream[Status]){ + val keys = broadcastVar.value.get("tweets.key").get.split(","); + val tweets = stream.filter { status => + Option(status.getUser).flatMap[String] { + u => Option(u.getLang) + }.getOrElse("").startsWith("en") && CharMatcher.ASCII.matchesAllOf(status.getText) && ( keys.isEmpty || keys.exists{key => status.getText.toLowerCase.contains(key.toLowerCase)}) + } + + val tweetAccumulator = sc.accumulable(Array[(String,String)]())(TweetsAccumulatorParam) + + new Thread( new Runnable() { + def run(){ + try{ + while(ssc!=null && ssc.getState() != StreamingContextState.STOPPED ){ + val accuValue = tweetAccumulator.value + if ( accuValue.size > 0 ){ + tweetAccumulator.setValue(Array[(String,String)]() ) + accuValue.foreach( v => send(v._1, v._2) ) + } + Thread.sleep( 1000L ) + } + System.out.println("Stopping the accumulator thread") + }catch{ + case e:Throwable => e.printStackTrace() + } + } + },"Accumulator").start + + val rowTweets = tweets.map(status=> { + lazy val client = PooledHttp1Client() + val sentiment = ToneAnalyzer.computeSentiment( client, status, broadcastVar ) + var scoreMap : Map[String, Double] = Map() + if ( sentiment != null ){ + // for( toneCategory <- Option(sentiment.tone_categories).getOrElse( Seq() )){ + for ( tone <- Option( sentiment.tones ).getOrElse( Seq() ) ){ + scoreMap.put( tone.tone_id, (BigDecimal(tone.score).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble) * 100.0 ) + } + // } + } + + var jsonSentiment="{"; + scoreMap.foreach( t => jsonSentiment = jsonSentiment + (if (jsonSentiment.length() == 1) "" else ",") + "\"" + t._1 + "\":" + t._2) + jsonSentiment += "}"; + val sendValue:String = "{\"author\": \"" + + status.getUser.getName + + "\", \"userid\":\"" + status.getUser.getScreenName + + "\", \"pic\":\"" + status.getUser.getOriginalProfileImageURLHttps + + "\",\"text\":" + JSONObject.quote( status.getText ) + ", \"sentiment\": " + jsonSentiment + "}" + + tweetAccumulator+=("tweets",sendValue) + + EnrichedTweet( + status.getUser.getName, + status.getUser.getScreenName, + status.getCreatedAt.toString, + status.getUser.getLang, + status.getText, + Option(status.getGeoLocation).map{ _.getLatitude}.getOrElse(0.0), + Option(status.getGeoLocation).map{ _.getLongitude}.getOrElse(0.0), + scoreMap + ) + }) + + rowTweets.foreachRDD( rdd => { + if( rdd.count > 0 ){ + workingRDD = SparkContext.getOrCreate().parallelize( rdd.map( t => t.toRow() ).collect()).union( workingRDD ) + } + }) + + val delimTagTone = "-%!" + val delimToneScore = ":%@" + val statsStream = rowTweets.map { eTweet => ("total_tweets", 1L) } + .reduceByKey( _+_ ) + .updateStateByKey( (a:scala.collection.Seq[Long], b:Option[Long] ) => { + var runningCount=b.getOrElse(0L) + a.foreach { v => runningCount=runningCount+v } + Some(runningCount) + }) + statsStream.foreachRDD( rdd =>{ + send("TweetProcessed", TweetsMetricJsonSerializer.serialize(rdd.collect())) + }) + + val metricsStream = rowTweets.flatMap { eTweet => { + val retList = ListBuffer[String]() + for ( tag <- eTweet.text.split("\\s+") ){ + if ( tag.startsWith( "#") && tag.length > 1 ){ + for ( tone <- Option( eTweet.sentimentScores.keys ).getOrElse( Seq() ) ){ + retList += (tag + delimTagTone + tone + delimToneScore + eTweet.sentimentScores.getOrElse( tone, 0.0)) + } + } + } + retList.toList + }} + .map { fullTag => { + val split = fullTag.split(delimToneScore); + (split(0), split(1).toFloat) + }} + .combineByKey( + (x:Float) => (x,1), + (x:(Float,Int), y:Float) => (x._1 + y, x._2+1), + (x:(Float,Int),y:(Float,Int)) => (x._1 + y._1, x._2 + y._2), + new HashPartitioner(sc.defaultParallelism) + ) + .map[(String,(Long/*count*/, List[(String, Double)]))]{ t => { + val key = t._1; + val ab = t._2; + val split = key.split(delimTagTone) + (split(0), (ab._2, List((split(1), BigDecimal(ab._1/ab._2).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble )))) + }} + .reduceByKey( (t,u) => (t._1+u._1, (t._2 ::: u._2).sortWith( (l,r) => l._1.compareTo( r._1 ) < 0 ))) + .mapValues( (item:(Long, List[(String,Double)])) => { + val unzip = item._2.unzip + (item._1/(item._2.size), unzip._1, unzip._2) + }) + .updateStateByKey( (a:scala.collection.Seq[(Long, List[String], List[Double])], b: Option[(Long, List[String], List[Double])]) => { + val safeB = b.getOrElse( (0L, List(), List() ) ) + var listTones = safeB._2 + var listScores = safeB._3 + var count = safeB._1 + for( item <- a ){ + count += item._1 + listScores = listScores.zipAll( item._3, 0.0, 0.0).map{ case(a,b)=>(a+b)/2 }.toList + listTones = item._2 + } + + Some( (count, listTones, listScores) ) + }) + + metricsStream.print + + metricsStream.foreachRDD( rdd =>{ + val topHashTags = rdd.sortBy( f => f._2._1, false ).take(5) + if ( !topHashTags.isEmpty){ + tweetAccumulator+=("topHashtags", TweetsMetricJsonSerializer.serialize(topHashTags.map( f => (f._1, f._2._1 )))) + tweetAccumulator+=("toneScores", ToneScoreJsonSerializer.serialize(topHashTags)) + } + }) + + } +} + +object TweetsAccumulatorParam extends AccumulableParam[Array[(String,String)], (String,String)]{ + def zero(initialValue:Array[(String,String)]):Array[(String,String)] = { + Array() + } + + def addInPlace(s1:Array[(String,String)], s2:Array[(String,String)]):Array[(String,String)] = { + s1 ++ s2 + } + + def addAccumulator(current:Array[(String,String)], s:(String,String)):Array[(String,String)] = { + current :+ s + } +} diff --git a/streaming-twitter/StatusSerializer.scala b/streaming-twitter/StatusSerializer.scala new file mode 100644 index 00000000..67446eca --- /dev/null +++ b/streaming-twitter/StatusSerializer.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.cds.spark.samples + +import java.io.ObjectOutputStream +import java.io.ByteArrayOutputStream +import org.apache.kafka.common.serialization.Serializer +import twitter4j.Status + +/** + * @author dtaieb + */ +class StatusSerializer extends Serializer[Status]{ + def configure( props: java.util.Map[String, _], isKey: Boolean) = { + + } + + def close(){ + + } + + def serialize(topic: String, value: Status ): Array[Byte] = { + val baos = new ByteArrayOutputStream(1024) + val oos = new ObjectOutputStream(baos) + oos.writeObject( value ) + oos.close + baos.toByteArray() + } +} \ No newline at end of file diff --git a/streaming-twitter/StreamingListener.scala b/streaming-twitter/StreamingListener.scala new file mode 100644 index 00000000..8fdf0461 --- /dev/null +++ b/streaming-twitter/StreamingListener.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.cds.spark.samples + +import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError +import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped +import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted +import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted +import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted + +/** + * @author dtaieb + */ +class StreamingListener extends org.apache.spark.streaming.scheduler.StreamingListener { + override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { + println("Receiver Started: " + receiverStarted.receiverInfo.name ) + } + + override def onReceiverError(receiverError: StreamingListenerReceiverError) { + println("Receiver Error: " + receiverError.receiverInfo.lastError) + } + + override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { + println("Receiver Stopped: " + receiverStopped.receiverInfo.name) + println("Reason: " + receiverStopped.receiverInfo.lastError + " : " + receiverStopped.receiverInfo.lastErrorMessage) + } + + override def onBatchStarted(batchStarted: StreamingListenerBatchStarted){ + println("Batch started with " + batchStarted.batchInfo.numRecords + " records") + } + + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted){ + println("Batch completed with " + batchCompleted.batchInfo.numRecords + " records"); + } +} \ No newline at end of file diff --git a/streaming-twitter/StreamingTwitter.scala b/streaming-twitter/StreamingTwitter.scala new file mode 100644 index 00000000..8ae62957 --- /dev/null +++ b/streaming-twitter/StreamingTwitter.scala @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.cds.spark.samples + +import scala.collection.mutable._ +import org.apache.commons.lang3.StringEscapeUtils +import org.apache.log4j.Level +import org.apache.log4j.Logger +import org.apache.spark.Accumulator +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext +import org.apache.spark.streaming._ +import org.apache.spark.streaming.dstream._ +import org.http4s._ +import org.http4s.Http4s._ +import org.http4s.Status._ +import org.http4s.client.Client +import org.http4s.client.blaze.PooledHttp1Client +import org.http4s.headers.Authorization +import com.ibm.couchdb._ +import scalaz._ +import scalaz.concurrent.Task +import twitter4j.Status +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.Row +import org.apache.spark.sql.types._ +import org.apache.spark.sql.DataFrame +import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.EmptyRDD +import com.google.common.base.CharMatcher +import scala.math.BigDecimal +import com.ibm.cds.spark.samples.config.DemoConfig +import com.ibm.cds.spark.samples.ToneAnalyzer.ToneCategory +// import org.apache.spark.Logging + + + + +/** + * @author dtaieb + */ +object StreamingTwitter { + val log = Logger.getLogger(getClass.getName) + var ssc: StreamingContext = null + var sqlContext: SQLContext = null + var workingRDD: RDD[Row] = null + var schemaTweets : StructType = null + val logger: Logger = Logger.getLogger( "com.ibm.cds.spark.samples.StreamingTwitter" ) + + //main method invoked when running as a standalone Spark Application + def main(args: Array[String]) { + + val conf = new SparkConf().setAppName("Spark Streaming Twitter Demo") + val sc = new SparkContext(conf) + startTwitterStreaming(sc, Seconds(10)); + } + + //Hold configuration key/value pairs + val config = new DemoConfig + + //Wrapper api for Notebook access + def setConfig(key:String, value:String){ + config.setConfig(key, value) + } + + def startTwitterStreaming( sc: SparkContext, stopAfter: Duration = Seconds(0) ){ + println("Starting twitter stream"); + if ( ssc != null ){ + println("Twitter Stream already running"); + println("Please use stopTwitterStreaming() first and try again"); + return; + } + + if ( !config.validateConfiguration(DemoConfig.CHECKPOINT_DIR_KEY) ){ + println("Unable to validate config") + return; + } + + Logger.getLogger("org.apache.spark").setLevel(Level.OFF) + + workingRDD = sc.emptyRDD + //Broadcast the config to each worker node + val broadcastVar = sc.broadcast(config.toImmutableMap) + + var canStopTwitterStream = true + var batchesProcessed=0 + + ssc = new StreamingContext( sc, Seconds(5) ) + + ssc.addStreamingListener( new StreamingListener ) + + try{ + sqlContext = new SQLContext(sc) + val keys = config.getConfig("tweets.key").split(","); + val stream = org.apache.spark.streaming.twitter.TwitterUtils.createStream( ssc, None ); + + if ( schemaTweets == null ){ + val schemaString = "author userid date lang text lat:double long:double" + schemaTweets = + StructType( + schemaString.split(" ").map( + fieldName => { + val ar = fieldName.split(":") + StructField( + ar.lift(0).get, + ar.lift(1).getOrElse("string") match{ + case "int" => IntegerType + case "double" => DoubleType + case _ => StringType + }, + true) + } + ).union( + ToneAnalyzer.sentimentFactors.map( f => StructField( f._1, DoubleType )).toArray[StructField] + ) + ) + } + val tweets = stream.filter { status => + Option(status.getUser).flatMap[String] { + u => Option(u.getLang) + }.getOrElse("").startsWith("en") && CharMatcher.ASCII.matchesAllOf(status.getText) && ( keys.isEmpty || keys.exists{status.getText.contains(_)}) + } + + lazy val client = PooledHttp1Client() + val rowTweets = tweets.map(status=> { + val sentiment = ToneAnalyzer.computeSentiment( client, status, broadcastVar ) + + var colValues = Array[Any]( + status.getUser.getName, //author + status.getUser.getScreenName, //Userid + status.getCreatedAt.toString, //date + status.getUser.getLang, //Lang + status.getText, //text + Option(status.getGeoLocation).map{ _.getLatitude}.getOrElse(0.0), //lat + Option(status.getGeoLocation).map{_.getLongitude}.getOrElse(0.0) //long + //exception + ) + + var scoreMap : Map[String, Double] = Map() + if ( sentiment != null ){ + // for( toneCategory <- Option(sentiment.tone_categories).getOrElse( Seq() )){ + for ( tone <- Option( sentiment.tones ).getOrElse( Seq() ) ){ + scoreMap.put( tone.tone_id, tone.score ) + } + // } + } + + colValues = colValues ++ ToneAnalyzer.sentimentFactors.map { f => (BigDecimal(scoreMap.get(f._2).getOrElse(0.0)).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble) * 100.0 } + //Return [Row, (sentiment, status)] + (Row(colValues.toArray:_*),(sentiment, status)) + }) + + rowTweets.foreachRDD( rdd => { + if(batchesProcessed==0){ + canStopTwitterStream=false + } + try{ + if( rdd.count > 0 ){ + batchesProcessed += 1 + workingRDD = sc.parallelize( rdd.map( t => t._1 ).collect()).union( workingRDD ) + + val saveToCloudant = broadcastVar.value.get("cloudant.save").get.toBoolean + if ( saveToCloudant ){ + rdd.foreachPartition { iterator => + var db: CouchDbApi = null; + val couch = CouchDb( broadcastVar.value.get("cloudant.hostName").get, + broadcastVar.value.get("cloudant.port").get.toInt, + broadcastVar.value.get("cloudant.https").get.toBoolean, + broadcastVar.value.get("cloudant.username").get, + broadcastVar.value.get("cloudant.password").get + ); + val dbName = "spark-streaming-twitter" + couch.dbs.get(dbName).attemptRun match{ + case -\/(e) => logger.trace("Couch Database does not exist, creating it now"); couch.dbs.create(dbName).run + case \/-(a) => println("Connected to cloudant db " + dbName ) + } + val typeMapping = TypeMapping(classOf[ToneAnalyzer.Tweet] -> "Tweet") + db = couch.db(dbName, typeMapping) + iterator.foreach( t => { + saveTweetToCloudant( client, db, t._2._2, t._2._1 ) + } + ) + } + } + } + }catch{ + case e: InterruptedException=>//Ignore + // case e: Exception => logError(e.getMessage, e ) + case e: Exception => log.error(e.getMessage, e ) + }finally{ + canStopTwitterStream = true + } + }) + + }catch{ + // case e : Exception => logError(e.getMessage, e ) + case e : Exception => log.error(e.getMessage, e ) + return + } + ssc.start() + + println("Twitter stream started"); + println("Tweets are collected real-time and analyzed") + println("To stop the streaming and start interacting with the data use: StreamingTwitter.stopTwitterStreaming") + + if ( !stopAfter.isZero ){ + //Automatically stop it after 10s + new Thread( new Runnable { + var displayMessage = true; + def run(){ + Thread.sleep( stopAfter.milliseconds ) + var loop = true + while(loop){ + if (canStopTwitterStream){ + stopTwitterStreaming + loop = false + }else{ + if ( displayMessage ){ + displayMessage = false + println("Received directive to stop twitter Stream: Waiting for already received tweets to be processed...") + } + Thread.sleep(5000L) + } + } + } + }).start + } + } + + def saveTweetToCloudant(client: Client, db: CouchDbApi, status:Status, sentiment: ToneAnalyzer.ToneCategory) : Status = { + if ( db != null){ + logger.trace("Creating new Tweet in Couch Database " + status.getText()) + val task:Task[Res.DocOk] = db.docs.create( + ToneAnalyzer.Tweet( + status.getUser().getName, + status.getCreatedAt().toString(), + status.getUser().getLang(), + status.getText(), + ToneAnalyzer.Geo( + Option(status.getGeoLocation).map{ _.getLatitude}.getOrElse(0.0), + Option(status.getGeoLocation).map{_.getLongitude}.getOrElse(0.0) + ), + sentiment + ) + ) + + // Execute the actions and process the result + task.attemptRun match { + // case -\/(e) => logError(e.getMessage, e ); + case -\/(e) => log.error(e.getMessage, e ); + case \/-(a) => logger.trace("Successfully create new Tweet in Couch Database " + status.getText() ) + } + } + + status + } + + def createTwitterDataFrames(sc: SparkContext) : (SQLContext, DataFrame) = { + if ( workingRDD.count <= 0 ){ + println("No data receive. Please start the Twitter stream again to collect data") + return null + } + + try{ + val df = sqlContext.createDataFrame( workingRDD, schemaTweets ) + df.registerTempTable("tweets") + + println("A new table named tweets with " + df.count() + " records has been correctly created and can be accessed through the SQLContext variable") + println("Here's the schema for tweets") + df.printSchema() + + (sqlContext, df) + }catch{ + // case e: Exception => {logError(e.getMessage, e ); return null} + case e: Exception => {log.error(e.getMessage, e ); return null} + } + } + + def stopTwitterStreaming(){ + if ( ssc == null){ + println("No Twitter stream to stop"); + return; + } + + println("Stopping Twitter stream. Please wait this may take a while") + ssc.stop(stopSparkContext = false, stopGracefully = false) + ssc = null + println("Twitter stream stopped"); + + println( "You can now create a sqlContext and DataFrame with " + workingRDD.count + " Tweets created. Sample usage: ") + println("val (sqlContext, df) = com.ibm.cds.spark.samples.StreamingTwitter.createTwitterDataFrames(sc)") + println("df.printSchema") + println("sqlContext.sql(\"select author, text from tweets\").show") + } +} diff --git a/streaming-twitter/ToneAnalyzer.scala b/streaming-twitter/ToneAnalyzer.scala new file mode 100644 index 00000000..ad43f3b7 --- /dev/null +++ b/streaming-twitter/ToneAnalyzer.scala @@ -0,0 +1,100 @@ +package com.ibm.cds.spark.samples + +import org.http4s.EntityEncoder +import org.http4s.Uri +import org.http4s.client.Client +import org.http4s.Request +import org.http4s.BasicCredentials +import org.http4s.Header +import org.http4s.Headers +import org.http4s.Method +import org.http4s.headers.Authorization +import org.apache.log4j.Logger +import org.apache.spark.broadcast.Broadcast +// import org.apache.spark.Logging +import org.apache.log4j.Logger +// import scala.util.parsing.json.JSON +// import org.codehaus.jettison.json.JSONObject +import org.json.JSONObject +import scala.util.parsing.json._ + +/** + * @author dtaieb + */ + +object ToneAnalyzer { + val log = Logger.getLogger(getClass.getName) + + val sentimentFactors = Array( + ("Anger","anger"), + ("Disgust","disgust"), + ("Fear","fear"), + ("Joy","joy"), + ("Sadness","sadness"), + ("Analytical","analytical"), + ("Confident","confident"), + ("Tentative","tentative"), + ("Openness","openness_big5"), + ("Conscientiousness","conscientiousness_big5"), + ("Extraversion","extraversion_big5"), + ("Agreeableness","agreeableness_big5"), + ("EmotionalRange","neuroticism_big5") + ) + + //Class models for Sentiment JSON + // case class DocumentTone( document_tone: Sentiment ) + // case class Sentiment(tone_categories: Seq[ToneCategory]); + // case class ToneCategory(category_id: String, category_name: String, tones: Seq[Tone]); + case class DocumentTone(document_tone: ToneCategory) + case class ToneCategory(tones: Seq[Tone]); + case class Tone(score: Double, tone_id: String, tone_name: String) +// case class Sentiment( scorecard: String, children: Seq[Tone] ) +// case class Tone( name: String, id: String, children: Seq[ToneResult]) +// case class ToneResult(name: String, id: String, word_count: Double, normalized_score: Double, raw_score: Double, linguistic_evidence: Seq[LinguisticEvidence] ) +// case class LinguisticEvidence( evidence_score: Double, word_count: Double, correlation: String, words : Seq[String]) + + case class Geo( lat: Double, long: Double ) + case class Tweet(author: String, date: String, language: String, text: String, geo : Geo, sentiment : ToneCategory) + + def computeSentiment( client: Client, status:StatusAdapter, broadcastVar: Broadcast[Map[String,String]] ) : ToneCategory = { + // logTrace("Calling sentiment from Watson Tone Analyzer: " + status.text) + log.trace("Calling sentiment from Watson Tone Analyzer: " + status.text) + try{ + //Get Sentiment on the tweet + val sentimentResults: String = + EntityEncoder[String].toEntity("{\"text\": " + JSONObject.quote( status.text ) + "}" ).flatMap { + entity => + val s = broadcastVar.value.get("watson.tone.url").get + "/v3/tone?version=" + broadcastVar.value.get("watson.api.version").get + val toneuri: Uri = Uri.fromString( s ).getOrElse( null ) + client( + Request( + method = Method.POST, + uri = toneuri, + headers = Headers( + Authorization( + BasicCredentials(broadcastVar.value.get("watson.tone.username").get, broadcastVar.value.get("watson.tone.password").get) + ), + Header("Accept", "application/json"), + Header("Content-Type", "application/json; charset=utf-8") + ), + body = entity.body + ) + ).flatMap { response => + if (response.status.code == 200 ) { + response.as[String] + } else { + println( "Error received from Watson Tone Analyzer. Code : " + response.status.code + " reason: " + response.status.reason ) + null + } + } + }.run + + upickle.read[DocumentTone](sentimentResults).document_tone + }catch{ + case e:Throwable => { + e.printStackTrace() + null + } + } + } +} diff --git a/streaming-twitter/TwitterAdapter.scala b/streaming-twitter/TwitterAdapter.scala new file mode 100644 index 00000000..221ae278 --- /dev/null +++ b/streaming-twitter/TwitterAdapter.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.cds.spark.samples + +import java.io.ObjectInputStream +import java.io.ByteArrayInputStream +import scala.util.parsing.json.JSON +import org.apache.kafka.common.serialization.Deserializer +import twitter4j.Status + +/** + * @author dtaieb + * Deserialization adapters for Twitter4J Status + */ + +case class StatusAdapter(userName:String, userId: String, userLang: String,createdAt:String,text:String, long:Double, lat:Double); + +object StatusAdapter{ + implicit def statusAdapterWrapper(status: Status) = + StatusAdapter( + status.getUser.getName, + status.getUser.getScreenName, + status.getUser.getLang, + status.getCreatedAt.toString, + status.getText, + Option(status.getGeoLocation).map{ _.getLongitude}.getOrElse(0.0), + Option(status.getGeoLocation).map{ _.getLatitude}.getOrElse(0.0) + ) +} + +class StatusDeserializer extends Deserializer[StatusAdapter]{ + def configure( props: java.util.Map[String, _], isKey: Boolean) = { + + } + + def close(){ + + } + + def deserialize(topic: String, data: Array[Byte] ): StatusAdapter = { + try{ + val bais = new ByteArrayInputStream( data ) + var ois:ObjectInputStream = null + try{ + ois = new ObjectInputStream( bais ) + ois.readObject().asInstanceOf[Status] + }finally{ + if (bais != null ){ + bais.close + } + if ( ois != null ){ + ois.close + } + } + }catch{ + case e:Throwable=>{ + val jsonObject = JSON.parseFull( new String(data) ).getOrElse(Map.empty).asInstanceOf[Map[String, Any]] + val user=jsonObject.get("user").getOrElse( Map.empty ).asInstanceOf[Map[String,Any]] + val geo = Option(jsonObject.get("geo").orNull).getOrElse(Map.empty).asInstanceOf[Map[String,Any]] + StatusAdapter( + user.get("name").getOrElse("").asInstanceOf[String], + user.get("userid").getOrElse("").asInstanceOf[String], + user.get("lang").getOrElse("").asInstanceOf[String], + jsonObject.get("created_at").getOrElse("").asInstanceOf[String], + jsonObject.get("text").getOrElse("").asInstanceOf[String], + geo.get("long").getOrElse(0.0).asInstanceOf[Double], + geo.get("lat").getOrElse(0.0).asInstanceOf[Double] + ) + } + } + } +} \ No newline at end of file diff --git a/streaming-twitter/build.sbt b/streaming-twitter/build.sbt index 8da1465d..32d75c64 100644 --- a/streaming-twitter/build.sbt +++ b/streaming-twitter/build.sbt @@ -2,28 +2,36 @@ name := "streaming-twitter" version := "1.6" -scalaVersion := "2.10.4" +scalaVersion := "2.11.4" libraryDependencies ++= { - val sparkVersion = "1.6.0" + val sparkVersion = "2.1.1" Seq( "org.apache.spark" %% "spark-core" % sparkVersion % "provided", "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided", - "org.apache.spark" %% "spark-streaming-twitter" % sparkVersion, + // "org.apache.spark" %% "spark-streaming-twitter" % sparkVersion, + "org.apache.bahir" %% "spark-streaming-twitter" % sparkVersion, "org.apache.spark" %% "spark-repl" % sparkVersion % "provided", - "com.ibm" %% "couchdb-scala" % "0.5.3", + "com.ibm" %% "couchdb-scala" % "0.5.1", "org.apache.kafka" % "kafka-log4j-appender" % "0.9.0.0", "org.apache.kafka" % "kafka-clients" % "0.9.0.0", "org.apache.kafka" %% "kafka" % "0.9.0.0", - "com.google.guava" % "guava" % "14.0.1" + "com.google.guava" % "guava" % "14.0.1", + "org.twitter4j" % "twitter4j-core" % "4.0.4", + "org.twitter4j" % "twitter4j-stream" % "4.0.4", + "org.json" % "json" % "20180130" ) } assemblyMergeStrategy in assembly := { - case PathList("org", "apache", "spark", xs @ _*) => MergeStrategy.first - case PathList("scala", xs @ _*) => MergeStrategy.discard - case PathList("com", "ibm", "pixiedust", xs @ _*) => MergeStrategy.discard + case PathList("org", "apache", "spark", xs @ _*) => MergeStrategy.first + case PathList("org", "apache", "hadoop", xs @ _*) => MergeStrategy.first + case PathList("com", "google", xs @ _*) => MergeStrategy.first + case PathList("org", "apache", xs @ _*) => MergeStrategy.first + case PathList("javax", "xml", xs @ _*) => MergeStrategy.first + case PathList("scala", xs @ _*) => MergeStrategy.discard + case PathList("com", "ibm", "pixiedust", xs @ _*) => MergeStrategy.discard case PathList("META-INF", "maven", "org.slf4j", xs @ _* ) => MergeStrategy.first case x => val oldStrategy = (assemblyMergeStrategy in assembly).value @@ -32,6 +40,7 @@ assemblyMergeStrategy in assembly := { unmanagedBase <<= baseDirectory { base => base / "lib" } +//Important line below. This strips out all the scala dependencies and shrinks down your jar into skinny jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false) resolvers += "scalaz-bintray" at "https://dl.bintray.com/scalaz/releases" From f44550784efb8be6e277fce406a628b79ac3fb2e Mon Sep 17 00:00:00 2001 From: Thomas Hornung Date: Fri, 16 Mar 2018 10:37:09 +0100 Subject: [PATCH 2/2] sorry wrong previous commit --- streaming-twitter/KafkaProducerTest.scala | 157 -------- .../MessageHubStreamingTwitter.scala | 381 ------------------ .../PixiedustStreamingTwitter.scala | 346 ---------------- streaming-twitter/StatusSerializer.scala | 44 -- streaming-twitter/StreamingListener.scala | 50 --- streaming-twitter/StreamingTwitter.scala | 310 -------------- streaming-twitter/ToneAnalyzer.scala | 100 ----- streaming-twitter/TwitterAdapter.scala | 87 ---- .../cds/spark/samples/KafkaProducerTest.scala | 19 +- .../samples/MessageHubStreamingTwitter.scala | 41 +- .../samples/PixiedustStreamingTwitter.scala | 15 +- .../cds/spark/samples/StreamingTwitter.scala | 27 +- .../ibm/cds/spark/samples/ToneAnalyzer.scala | 29 +- 13 files changed, 80 insertions(+), 1526 deletions(-) delete mode 100644 streaming-twitter/KafkaProducerTest.scala delete mode 100644 streaming-twitter/MessageHubStreamingTwitter.scala delete mode 100644 streaming-twitter/PixiedustStreamingTwitter.scala delete mode 100644 streaming-twitter/StatusSerializer.scala delete mode 100644 streaming-twitter/StreamingListener.scala delete mode 100644 streaming-twitter/StreamingTwitter.scala delete mode 100644 streaming-twitter/ToneAnalyzer.scala delete mode 100644 streaming-twitter/TwitterAdapter.scala diff --git a/streaming-twitter/KafkaProducerTest.scala b/streaming-twitter/KafkaProducerTest.scala deleted file mode 100644 index ea531b86..00000000 --- a/streaming-twitter/KafkaProducerTest.scala +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ibm.cds.spark.samples - -import java.io.ByteArrayInputStream -import java.io.ByteArrayOutputStream -import java.io.ObjectInputStream -import java.io.ObjectOutputStream -import java.util.concurrent.TimeUnit -import scala.collection.JavaConversions.mapAsJavaMap -import scala.collection.JavaConversions.seqAsJavaList -import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.serialization.Deserializer -import org.apache.kafka.common.serialization.Serializer -import org.apache.kafka.common.serialization.StringDeserializer -import org.apache.log4j.Level -import org.apache.log4j.Logger -import com.ibm.cds.spark.samples.config.MessageHubConfig -import twitter4j.StallWarning -import twitter4j.Status -import twitter4j.StatusDeletionNotice -import twitter4j.StatusListener -import twitter4j.TwitterStreamFactory -import twitter4j.TwitterStream -import scala.util.parsing.json.JSON -import java.io.InputStream -import com.ibm.cds.spark.samples.config.DemoConfig -// import org.apache.spark.Logging -import org.apache.log4j.Logger - - -/** - * @author dtaieb - */ -object KafkaProducerTest { - //Very verbose, enable only if necessary - //Logger.getLogger("org.apache.kafka").setLevel(Level.ALL) - //Logger.getLogger("kafka").setLevel(Level.ALL) - val log = Logger.getLogger(getClass.getName) - - var twitterStream : TwitterStream = _; - - def main(args: Array[String]): Unit = { - createTwitterStream(); - } - - def createTwitterStream(props: DemoConfig=null):TwitterStream = { - if( twitterStream != null){ - println("Twitter Stream already running. Please call closeTwitterStream first"); - return twitterStream; - } - var kafkaProps:MessageHubConfig = null; - if ( props == null ){ - kafkaProps = new MessageHubConfig - }else{ - kafkaProps = props.cloneConfig - } - kafkaProps.setValueSerializer[StatusSerializer] - kafkaProps.validateConfiguration("watson.tone.") - kafkaProps.createTopicsIfNecessary( kafkaProps.getConfig(MessageHubConfig.KAFKA_TOPIC_TWEETS ) ) - val kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer[java.lang.String, Status]( kafkaProps.toImmutableMap() ); - - twitterStream = new TwitterStreamFactory().getInstance(); - twitterStream.addListener( new StatusListener(){ - var lastSent:Long = 0; - def onStatus(status: Status){ - if ( lastSent == 0 || System.currentTimeMillis() - lastSent > 200L){ - lastSent = System.currentTimeMillis() - // logInfo("Got a status " + status.getText ) - log.info("Got a status " + status.getText ) - val producerRecord = new ProducerRecord(kafkaProps.getConfig(MessageHubConfig.KAFKA_TOPIC_TWEETS ), "tweet", status ) - try{ - val metadata = kafkaProducer.send( producerRecord ).get(2000, TimeUnit.SECONDS); - // logInfo("Successfully sent record: Topic: " + metadata.topic + " Offset: " + metadata.offset ) - log.info("Successfully sent record: Topic: " + metadata.topic + " Offset: " + metadata.offset ) - }catch{ - case e:Throwable => e.printStackTrace - } - } - } - def onDeletionNotice( notice: StatusDeletionNotice){ - - } - def onTrackLimitationNotice( numLimitation : Int){ - println("Received track limitation notice from Twitter: " + numLimitation) - } - - def onException( e: Exception){ - println("Unexpected error from twitterStream: " + e.getMessage); - // logError(e.getMessage, e) - log.error(e.getMessage, e) - } - - def onScrubGeo(lat: Long, long: Long ){ - - } - - def onStallWarning(warning: StallWarning ){ - - } - }) - - //Start twitter stream sampling - twitterStream.sample(); - - println("Twitter stream started. Tweets will flow to MessageHub instance. Please call closeTwitterStream to stop the stream") - twitterStream - } - - def closeTwitterStream(){ - if ( twitterStream==null){ - println("Nothing to close. Twitter stream has not been started") - }else{ - println("Stopping twitter stream"); - twitterStream.shutdown() - twitterStream=null - println("Twitter Stream stopped") - } - } -} - -object KafkaConsumerTest { - def main(args: Array[String]): Unit = { - val kafkaProps = new MessageHubConfig - kafkaProps.validateConfiguration("watson.tone.") - val kafkaConsumer = new KafkaConsumer[java.lang.String, StatusAdapter](kafkaProps.toImmutableMap, new StringDeserializer(), new StatusDeserializer()) - - kafkaConsumer.subscribe( List(kafkaProps.getConfig(MessageHubConfig.KAFKA_TOPIC_TWEETS )) ) - new Thread( new Runnable { - def run(){ - while( true ){ - Thread.sleep( 1000L ) - val it = kafkaConsumer.poll(1000L).iterator - while( it.hasNext() ){ - val record = it.next(); - println( record.value ); - } - } - } - }).start - } -} diff --git a/streaming-twitter/MessageHubStreamingTwitter.scala b/streaming-twitter/MessageHubStreamingTwitter.scala deleted file mode 100644 index 1dfa086e..00000000 --- a/streaming-twitter/MessageHubStreamingTwitter.scala +++ /dev/null @@ -1,381 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.cds.spark.samples - -import scala.BigDecimal -import scala.collection.JavaConversions.mapAsJavaMap -import scala.collection.immutable.Seq.canBuildFrom -import scala.collection.mutable.ListBuffer -import scala.collection.mutable.Map -import scala.reflect.ClassTag -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.serialization.StringDeserializer -import org.apache.kafka.common.serialization.StringSerializer -import org.apache.spark.HashPartitioner -import org.apache.spark.SparkConf -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Row -import org.apache.spark.streaming.Duration -import org.apache.spark.streaming.Seconds -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions -import org.http4s.client.blaze.PooledHttp1Client -import com.google.common.base.CharMatcher -import com.ibm.cds.spark.samples.config.MessageHubConfig -import com.ibm.cds.spark.samples.dstream.KafkaStreaming.KafkaStreamingContextAdapter -import twitter4j.Status -import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted -import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted -import com.ibm.cds.spark.samples.config.DemoConfig -import org.apache.log4j.Level -import org.apache.log4j.Logger -import org.apache.spark.streaming.dstream.DStream -import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped -import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError -import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted -import org.apache.spark.broadcast.Broadcast -// import org.apache.spark.Logging -import org.apache.log4j.Logger - -import java.util.Arrays - -/** - * @author dtaieb - * Twitter+Watson sample app with MessageHub/Kafka - */ -object MessageHubStreamingTwitter { - val log = Logger.getLogger(getClass.getName) - - var ssc: StreamingContext = null - val reuseCheckpoint = false; - - val queue = new scala.collection.mutable.Queue[(String, String)] - - final val KAFKA_TOPIC_TOP_HASHTAGS = "topHashTags" - final val KAFKA_TOPIC_TONE_SCORES = "topHashTags.toneScores" - final val KAFKA_TOPIC_TOTAL_TWEETS_PROCESSED = "total_tweets" - - //Logger.getLogger("org.apache.kafka").setLevel(Level.ALL) - //Logger.getLogger("kafka").setLevel(Level.ALL) - // Logger.getLogger("org.apache.spark").setLevel(Level.WARN) - - def main(args: Array[String]): Unit = { - println("Printing arguments: "); - args.foreach { println } - - if(args.length>0 && System.getProperty("DEMO_CONFIG_PATH") == null ){ - //On Spark Service, input files are passed as parameters, if available, we assume first parameter is config file - System.setProperty("DEMO_CONFIG_PATH", args(0)) - } - val conf = new SparkConf().setAppName("Spark Streaming Twitter + Watson with MessageHub/Kafka Demo") - val sc = new SparkContext(conf) - startTwitterStreaming(sc); - - if(ssc!=null){ - //When running as stand alone app, we call awaitTermination to make sure the JVM doesn't exit prematurely due to the fact - //that all non-daemon threads have terminated. Note: Don't call awaitTermination directly from startTwitterStreaming as it can be run - //From Notebook - ssc.awaitTermination() - } - } - - //Hold configuration key/value pairs - lazy val kafkaProps = new MessageHubConfig - - //Wrapper api for Notebook access - def getConfig():DemoConfig={ - kafkaProps - } - - def startTwitterStreaming( sc: SparkContext, stopAfter: Duration = Seconds(0) ){ - if ( ssc != null ){ - println("Twitter Stream already running"); - return; - } - - kafkaProps.setValueSerializer[StringSerializer]; - - if ( !kafkaProps.validateConfiguration("twitter4j.oauth") ){ - return; - } - - //Set the hadoop configuration if needed - val checkpointDir = kafkaProps.getConfig( MessageHubConfig.CHECKPOINT_DIR_KEY ); - if ( checkpointDir.startsWith("swift") ){ - println("Setting hadoop configuration for swift container") - kafkaProps.set_hadoop_config(sc) - } - - //Make sure the topics are already created - kafkaProps.createTopicsIfNecessary( KAFKA_TOPIC_TONE_SCORES, KAFKA_TOPIC_TOP_HASHTAGS, KAFKA_TOPIC_TOTAL_TWEETS_PROCESSED ) - - val kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer[String, String]( kafkaProps.toImmutableMap ); - - if ( !reuseCheckpoint ){ - createStreamingContextAndRunAnalytics(sc); - }else{ - ssc = StreamingContext.getOrCreate( - kafkaProps.getConfig( MessageHubConfig.CHECKPOINT_DIR_KEY ), - () => { - createStreamingContextAndRunAnalytics(sc); - }, - sc.hadoopConfiguration, - true - ); - } - - ssc.addStreamingListener( new StreamingListener() ) - - new Thread( new Runnable() { - def run(){ - while(ssc!=null){ - while(!queue.isEmpty ){ - try{ - var task:(String,String) = null; - queue.synchronized{ - task = queue.dequeue(); - } - if ( task != null ){ - val producerRecord = new ProducerRecord[String,String](task._1, "tweet", task._2 ) - val metadata = kafkaProducer.send( producerRecord ).get; - // logInfo("Sent record " + metadata.offset() + " Topic " + task._1) - log.info("Sent record " + metadata.offset() + " Topic " + task._1) - } - }catch{ - // case e:Throwable => logError(e.getMessage, e) - case e:Throwable => log.error(e.getMessage, e) - } - } - queue.synchronized{ - queue.wait(); - } - } - } - },"Message Hub producer").start - - ssc.start - - println("Twitter stream started"); - println("Tweets are collected real-time and analyzed") - println("To stop the streaming and start interacting with the data use: StreamingTwitter.stopTwitterStreaming") - - if ( !stopAfter.isZero ){ - //Automatically stop it after 10s - new Thread( new Runnable { - def run(){ - Thread.sleep( stopAfter.milliseconds ) - stopTwitterStreaming - } - }).start - } - } - - def createStreamingContextAndRunAnalytics(sc:SparkContext):StreamingContext={ - //Broadcast the config to each worker node - val broadcastVar = sc.broadcast( kafkaProps.toImmutableMap ) - ssc = new StreamingContext( sc, Seconds(5) ) - ssc.checkpoint(kafkaProps.getConfig( MessageHubConfig.CHECKPOINT_DIR_KEY )); - val stream = ssc.createKafkaStream[String, StatusAdapter,StringDeserializer, StatusDeserializer]( - kafkaProps, - List(kafkaProps.getConfig(MessageHubConfig.KAFKA_TOPIC_TWEETS )) - ); - runAnalytics(sc, broadcastVar, stream) - ssc; - } - - def runAnalytics(sc:SparkContext, broadcastVar: Broadcast[scala.collection.immutable.Map[String,String]], stream:DStream[(String,StatusAdapter)]){ - val keys = broadcastVar.value.get("tweets.key").get.split(","); - val tweets = stream.map( t => t._2) - .filter { status => - status.userLang.startsWith("en") && CharMatcher.ASCII.matchesAllOf(status.text) && ( keys.isEmpty || keys.exists{status.text.contains(_)}) - } - - val rowTweets = tweets.map(status=> { - lazy val client = PooledHttp1Client() - val sentiment = ToneAnalyzer.computeSentiment( client, status, broadcastVar ) - var scoreMap : Map[String, Double] = Map() - if ( sentiment != null ){ - // for( toneCategory <- Option(sentiment.tone_categories).getOrElse( Seq() )){ - for ( tone <- Option( sentiment.tones ).getOrElse( Seq() ) ){ - scoreMap.put( tone.tone_id, (BigDecimal(tone.score).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble) * 100.0 ) - } - //} - } - - EnrichedTweet( - status.userName, - status.userId, - status.createdAt, - status.userLang, - status.text, - status.long, - status.lat, - scoreMap - ) - }) - - val delimTagTone = "-%!" - val delimToneScore = ":%@" - val statsStream = rowTweets.map { eTweet => ("total_tweets", 1L) } - .reduceByKey( _+_ ) - .updateStateByKey( (a:Seq[Long], b:Option[Long] ) => { - var runningCount=b.getOrElse(0L) - a.foreach { v => runningCount=runningCount+v } - Some(runningCount) - }) - statsStream.foreachRDD( rdd =>{ - queue.synchronized{ - queue+=((KAFKA_TOPIC_TOTAL_TWEETS_PROCESSED, TweetsMetricJsonSerializer.serialize(rdd.collect()))) - try{ - queue.notify - }catch{ - // case e:Throwable=>logError(e.getMessage, e) - case e:Throwable=>log.error(e.getMessage, e) - } - } - }) - - val metricsStream = rowTweets.flatMap { eTweet => { - val retList = ListBuffer[String]() - for ( tag <- eTweet.text.split("\\s+") ){ - if ( tag.startsWith( "#") && tag.length > 1 ){ - for ( tone <- Option( eTweet.sentimentScores.keys ).getOrElse( Seq() ) ){ - retList += (tag + delimTagTone + tone + delimToneScore + eTweet.sentimentScores.getOrElse( tone, 0.0)) - } - } - } - retList.toList - }} - .map { fullTag => { - val split = fullTag.split(delimToneScore); - (split(0), split(1).toFloat) - }} - .combineByKey( - (x:Float) => (x,1), - (x:(Float,Int), y:Float) => (x._1 + y, x._2+1), - (x:(Float,Int),y:(Float,Int)) => (x._1 + y._1, x._2 + y._2), - new HashPartitioner(sc.defaultParallelism) - ) - .map[(String,(Long/*count*/, List[(String, Double)]))]{ t => { - val key = t._1; - val ab = t._2; - val split = key.split(delimTagTone) - (split(0), (ab._2, List((split(1), BigDecimal(ab._1/ab._2).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble )))) - }} - .reduceByKey( (t,u) => (t._1+u._1, (t._2 ::: u._2).sortWith( (l,r) => l._1.compareTo( r._1 ) < 0 ))) - .mapValues( (item:(Long, List[(String,Double)])) => { - val unzip = item._2.unzip - (item._1/(item._2.size), unzip._1, unzip._2) - }) - .updateStateByKey( (a:scala.collection.Seq[(Long, List[String], List[Double])], b: Option[(Long, List[String], List[Double])]) => { - val safeB = b.getOrElse( (0L, List(), List() ) ) - var listTones = safeB._2 - var listScores = safeB._3 - var count = safeB._1 - for( item <- a ){ - count += item._1 - listScores = listScores.zipAll( item._3, 0.0, 0.0).map{ case(a,b)=>(a+b)/2 }.toList - listTones = item._2 - } - - Some( (count, listTones, listScores) ) - }) - - metricsStream.print - - metricsStream.foreachRDD( rdd =>{ - val topHashTags = rdd.sortBy( f => f._2._1, false ).take(5) - if ( !topHashTags.isEmpty){ - queue.synchronized{ - queue += ((KAFKA_TOPIC_TOP_HASHTAGS, TweetsMetricJsonSerializer.serialize(topHashTags.map( f => (f._1, f._2._1 ))))) - queue += ((KAFKA_TOPIC_TONE_SCORES, ToneScoreJsonSerializer.serialize(topHashTags))) - try{ - queue.notify - }catch{ - // case e:Throwable=>logError(e.getMessage, e) - case e:Throwable=>log.error(e.getMessage, e) - } - } - } - }) - } - - def stopTwitterStreaming(){ - if ( ssc == null){ - println("No Twitter stream to stop"); - return; - } - - println("Stopping Twitter stream. Please wait this may take a while") - ssc.stop(stopSparkContext = false, stopGracefully = true) - ssc = null - println("Twitter stream stopped"); - } -} - -object TweetsMetricJsonSerializer { - val log = Logger.getLogger(getClass.getName) - def serialize(value: Seq[(String,Long)] ): String = { - val sb = new StringBuilder("[") - var comma = "" - value.foreach( item => { - sb.append( comma + "[\"" + item._1.replaceAll("\"", "") + "\"," + item._2 + "]") - comma="," - }) - sb.append("]") - // logInfo("Serialized json: " + sb) - log.info("Serialized json: " + sb) - sb.toString() - } -} - -object ToneScoreJsonSerializer { - val log = Logger.getLogger(getClass.getName) - def serializeList[U:ClassTag]( label: String, value: List[U] ):String = { - val sb = new StringBuilder("[\"" + label.replaceAll("\"", "") + "\"") - value.foreach { item => { - if ( item.isInstanceOf[String] ) { - val s = ",\"" + item.toString().replaceAll("\"", "") + "\""; - sb.append( s.replaceAll("\"\"", "\"") ) - }else if ( item.isInstanceOf[Double] ){ - sb.append("," + item ) - } - }} - sb.append("]") - sb.toString - } - def serialize(value:Seq[(String, (Long, List[String], List[Double]))]):String={ - val sb = new StringBuilder("[") - var comma = "" - var appendToneData = true; - value.foreach( item => { - if ( appendToneData ){ - sb.append( comma + serializeList( "x", item._2._2 ) ) - appendToneData = false - comma = "," - } - sb.append( comma + serializeList( item._1, item._2._3 ) ) - comma="," - }) - sb.append("]") - // logInfo("Serialized size: " + value.size + ". Tone json: " + sb) - log.info("Serialized size: " + value.size + ". Tone json: " + sb) - sb.toString() - } -} diff --git a/streaming-twitter/PixiedustStreamingTwitter.scala b/streaming-twitter/PixiedustStreamingTwitter.scala deleted file mode 100644 index 7a87df10..00000000 --- a/streaming-twitter/PixiedustStreamingTwitter.scala +++ /dev/null @@ -1,346 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.cds.spark.samples - -import scala.collection.mutable._ -import com.ibm.pixiedust.ChannelReceiver -// import org.apache.spark.Logging -import org.apache.log4j.Logger -import org.apache.log4j.Level -import org.apache.spark.SparkContext -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.Row -import com.ibm.cds.spark.samples.config.DemoConfig -import org.apache.spark.streaming.Seconds -import org.apache.spark.sql.types.IntegerType -import org.apache.spark.sql.types.DoubleType -import org.http4s.client.blaze.PooledHttp1Client -import org.apache.spark.sql.types.StructField -import org.apache.spark.sql.types.StringType -import com.google.common.base.CharMatcher -import com.ibm.couchdb.CouchDb -import com.ibm.couchdb.TypeMapping -import com.ibm.couchdb.CouchDbApi -import org.apache.spark.sql.SQLContext -import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError -import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped -import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted -import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted -import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted -import org.apache.spark.SparkConf -import org.apache.spark.streaming.dstream.DStream -import org.apache.spark.broadcast.Broadcast -import org.apache.spark.HashPartitioner -import twitter4j.Status -// import org.codehaus.jettison.json.JSONObject -import org.json.JSONObject -import org.apache.spark.AccumulableParam -import org.apache.spark.streaming.StreamingContextState -import org.apache.spark.sql.DataFrame - -/* @author dtaieb - * Twitter+Watson sentiment analysis app powered by Pixiedust - */ -object PixiedustStreamingTwitter extends ChannelReceiver() { - var ssc: StreamingContext = null - var workingRDD: RDD[Row] = null - //Hold configuration key/value pairs - lazy val config = new DemoConfig - lazy val logger: Logger = Logger.getLogger( "com.ibm.cds.spark.samples.PixiedustStreamingTwitter" ) - - val BEGINSTREAM = "@BEGINSTREAM@" - val ENDSTREAM = "@ENDSTREAM@" - - def sendLog(s:String){ - send("log", s) - } - - //Wrapper api for Notebook access - def setConfig(key:String, value:String){ - config.setConfig(key, value) - } - - //main method invoked when running as a standalone Spark Application - def main(args: Array[String]) { - val conf = new SparkConf().setAppName("Pixiedust Spark Streaming Twitter Demo") - val sc = new SparkContext(conf) - startStreaming(); - } - - def createTwitterDataFrames(sqlContext: SQLContext) : DataFrame = { - if ( workingRDD == null || workingRDD.count <= 0 ){ - println("No data receive. Please start the Twitter stream again to collect data") - return null - } - - sqlContext.createDataFrame( workingRDD, schemaTweets ) - } - - class PixiedustStreamingListener extends org.apache.spark.streaming.scheduler.StreamingListener { - override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { - sendLog("Receiver Started: " + receiverStarted.receiverInfo.name ) - //Signal the frontend that we started streaming - sendLog(BEGINSTREAM) - } - - override def onReceiverError(receiverError: StreamingListenerReceiverError) { - sendLog("Receiver Error: " + receiverError.receiverInfo.lastError) - } - - override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { - sendLog("Receiver Stopped: " + receiverStopped.receiverInfo.name) - sendLog("Reason: " + receiverStopped.receiverInfo.lastError + " : " + receiverStopped.receiverInfo.lastErrorMessage) - //signal the front end that we're done streaming - sendLog(ENDSTREAM) - } - - override def onBatchStarted(batchStarted: StreamingListenerBatchStarted){ - sendLog("Batch started with " + batchStarted.batchInfo.numRecords + " records") - } - - override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted){ - sendLog("Batch completed with " + batchCompleted.batchInfo.numRecords + " records"); - } - } - - val reuseCheckpoint = false; - - def startStreaming(){ - val sc = SparkContext.getOrCreate - sendLog("Starting twitter stream"); - if ( ssc != null ){ - sendLog("Twitter Stream already running"); - sendLog("Please use stopTwitterStreaming() first and try again"); - return; - } - - if ( !config.validateConfiguration() ){ - sendLog("Unable to validate config") - sendLog(ENDSTREAM) - return; - } - - Logger.getLogger("org.apache.spark").setLevel(Level.OFF) - - //Set the hadoop configuration if needed - val checkpointDir = config.getConfig( DemoConfig.CHECKPOINT_DIR_KEY ); - if ( checkpointDir.startsWith("swift") ){ - println("Setting hadoop configuration for swift container") - config.set_hadoop_config(sc) - } - - workingRDD = sc.emptyRDD - - if ( !reuseCheckpoint ){ - ssc = createStreamingContextAndRunAnalytics(sc); - }else{ - ssc = StreamingContext.getOrCreate( - config.getConfig( DemoConfig.CHECKPOINT_DIR_KEY ), - () => { - createStreamingContextAndRunAnalytics(sc); - }, - sc.hadoopConfiguration, - true - ); - } - - ssc.addStreamingListener( new PixiedustStreamingListener ) - - ssc.start() - - sendLog("Twitter stream started"); - } - - def stopStreaming(){ - if ( ssc == null){ - sendLog("No Twitter stream to stop"); - return; - } - - sendLog("Stopping Twitter stream. Please wait this may take a while") - ssc.stop(stopSparkContext = false, stopGracefully = false) - ssc = null - sendLog("Twitter stream stopped"); - } - - def createStreamingContextAndRunAnalytics(sc:SparkContext):StreamingContext={ - //Broadcast the config to each worker node - val broadcastVar = sc.broadcast( config.toImmutableMap ) - ssc = new StreamingContext( sc, Seconds(5) ) - ssc.checkpoint(config.getConfig( DemoConfig.CHECKPOINT_DIR_KEY )); - val stream = org.apache.spark.streaming.twitter.TwitterUtils.createStream( ssc, None ); - runAnalytics(sc, broadcastVar, stream) - ssc; - } - - def runAnalytics(sc:SparkContext, broadcastVar: Broadcast[scala.collection.immutable.Map[String,String]], stream:DStream[Status]){ - val keys = broadcastVar.value.get("tweets.key").get.split(","); - val tweets = stream.filter { status => - Option(status.getUser).flatMap[String] { - u => Option(u.getLang) - }.getOrElse("").startsWith("en") && CharMatcher.ASCII.matchesAllOf(status.getText) && ( keys.isEmpty || keys.exists{key => status.getText.toLowerCase.contains(key.toLowerCase)}) - } - - val tweetAccumulator = sc.accumulable(Array[(String,String)]())(TweetsAccumulatorParam) - - new Thread( new Runnable() { - def run(){ - try{ - while(ssc!=null && ssc.getState() != StreamingContextState.STOPPED ){ - val accuValue = tweetAccumulator.value - if ( accuValue.size > 0 ){ - tweetAccumulator.setValue(Array[(String,String)]() ) - accuValue.foreach( v => send(v._1, v._2) ) - } - Thread.sleep( 1000L ) - } - System.out.println("Stopping the accumulator thread") - }catch{ - case e:Throwable => e.printStackTrace() - } - } - },"Accumulator").start - - val rowTweets = tweets.map(status=> { - lazy val client = PooledHttp1Client() - val sentiment = ToneAnalyzer.computeSentiment( client, status, broadcastVar ) - var scoreMap : Map[String, Double] = Map() - if ( sentiment != null ){ - // for( toneCategory <- Option(sentiment.tone_categories).getOrElse( Seq() )){ - for ( tone <- Option( sentiment.tones ).getOrElse( Seq() ) ){ - scoreMap.put( tone.tone_id, (BigDecimal(tone.score).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble) * 100.0 ) - } - // } - } - - var jsonSentiment="{"; - scoreMap.foreach( t => jsonSentiment = jsonSentiment + (if (jsonSentiment.length() == 1) "" else ",") + "\"" + t._1 + "\":" + t._2) - jsonSentiment += "}"; - val sendValue:String = "{\"author\": \"" + - status.getUser.getName + - "\", \"userid\":\"" + status.getUser.getScreenName + - "\", \"pic\":\"" + status.getUser.getOriginalProfileImageURLHttps + - "\",\"text\":" + JSONObject.quote( status.getText ) + ", \"sentiment\": " + jsonSentiment + "}" - - tweetAccumulator+=("tweets",sendValue) - - EnrichedTweet( - status.getUser.getName, - status.getUser.getScreenName, - status.getCreatedAt.toString, - status.getUser.getLang, - status.getText, - Option(status.getGeoLocation).map{ _.getLatitude}.getOrElse(0.0), - Option(status.getGeoLocation).map{ _.getLongitude}.getOrElse(0.0), - scoreMap - ) - }) - - rowTweets.foreachRDD( rdd => { - if( rdd.count > 0 ){ - workingRDD = SparkContext.getOrCreate().parallelize( rdd.map( t => t.toRow() ).collect()).union( workingRDD ) - } - }) - - val delimTagTone = "-%!" - val delimToneScore = ":%@" - val statsStream = rowTweets.map { eTweet => ("total_tweets", 1L) } - .reduceByKey( _+_ ) - .updateStateByKey( (a:scala.collection.Seq[Long], b:Option[Long] ) => { - var runningCount=b.getOrElse(0L) - a.foreach { v => runningCount=runningCount+v } - Some(runningCount) - }) - statsStream.foreachRDD( rdd =>{ - send("TweetProcessed", TweetsMetricJsonSerializer.serialize(rdd.collect())) - }) - - val metricsStream = rowTweets.flatMap { eTweet => { - val retList = ListBuffer[String]() - for ( tag <- eTweet.text.split("\\s+") ){ - if ( tag.startsWith( "#") && tag.length > 1 ){ - for ( tone <- Option( eTweet.sentimentScores.keys ).getOrElse( Seq() ) ){ - retList += (tag + delimTagTone + tone + delimToneScore + eTweet.sentimentScores.getOrElse( tone, 0.0)) - } - } - } - retList.toList - }} - .map { fullTag => { - val split = fullTag.split(delimToneScore); - (split(0), split(1).toFloat) - }} - .combineByKey( - (x:Float) => (x,1), - (x:(Float,Int), y:Float) => (x._1 + y, x._2+1), - (x:(Float,Int),y:(Float,Int)) => (x._1 + y._1, x._2 + y._2), - new HashPartitioner(sc.defaultParallelism) - ) - .map[(String,(Long/*count*/, List[(String, Double)]))]{ t => { - val key = t._1; - val ab = t._2; - val split = key.split(delimTagTone) - (split(0), (ab._2, List((split(1), BigDecimal(ab._1/ab._2).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble )))) - }} - .reduceByKey( (t,u) => (t._1+u._1, (t._2 ::: u._2).sortWith( (l,r) => l._1.compareTo( r._1 ) < 0 ))) - .mapValues( (item:(Long, List[(String,Double)])) => { - val unzip = item._2.unzip - (item._1/(item._2.size), unzip._1, unzip._2) - }) - .updateStateByKey( (a:scala.collection.Seq[(Long, List[String], List[Double])], b: Option[(Long, List[String], List[Double])]) => { - val safeB = b.getOrElse( (0L, List(), List() ) ) - var listTones = safeB._2 - var listScores = safeB._3 - var count = safeB._1 - for( item <- a ){ - count += item._1 - listScores = listScores.zipAll( item._3, 0.0, 0.0).map{ case(a,b)=>(a+b)/2 }.toList - listTones = item._2 - } - - Some( (count, listTones, listScores) ) - }) - - metricsStream.print - - metricsStream.foreachRDD( rdd =>{ - val topHashTags = rdd.sortBy( f => f._2._1, false ).take(5) - if ( !topHashTags.isEmpty){ - tweetAccumulator+=("topHashtags", TweetsMetricJsonSerializer.serialize(topHashTags.map( f => (f._1, f._2._1 )))) - tweetAccumulator+=("toneScores", ToneScoreJsonSerializer.serialize(topHashTags)) - } - }) - - } -} - -object TweetsAccumulatorParam extends AccumulableParam[Array[(String,String)], (String,String)]{ - def zero(initialValue:Array[(String,String)]):Array[(String,String)] = { - Array() - } - - def addInPlace(s1:Array[(String,String)], s2:Array[(String,String)]):Array[(String,String)] = { - s1 ++ s2 - } - - def addAccumulator(current:Array[(String,String)], s:(String,String)):Array[(String,String)] = { - current :+ s - } -} diff --git a/streaming-twitter/StatusSerializer.scala b/streaming-twitter/StatusSerializer.scala deleted file mode 100644 index 67446eca..00000000 --- a/streaming-twitter/StatusSerializer.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.cds.spark.samples - -import java.io.ObjectOutputStream -import java.io.ByteArrayOutputStream -import org.apache.kafka.common.serialization.Serializer -import twitter4j.Status - -/** - * @author dtaieb - */ -class StatusSerializer extends Serializer[Status]{ - def configure( props: java.util.Map[String, _], isKey: Boolean) = { - - } - - def close(){ - - } - - def serialize(topic: String, value: Status ): Array[Byte] = { - val baos = new ByteArrayOutputStream(1024) - val oos = new ObjectOutputStream(baos) - oos.writeObject( value ) - oos.close - baos.toByteArray() - } -} \ No newline at end of file diff --git a/streaming-twitter/StreamingListener.scala b/streaming-twitter/StreamingListener.scala deleted file mode 100644 index 8fdf0461..00000000 --- a/streaming-twitter/StreamingListener.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.cds.spark.samples - -import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError -import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped -import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted -import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted -import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted - -/** - * @author dtaieb - */ -class StreamingListener extends org.apache.spark.streaming.scheduler.StreamingListener { - override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { - println("Receiver Started: " + receiverStarted.receiverInfo.name ) - } - - override def onReceiverError(receiverError: StreamingListenerReceiverError) { - println("Receiver Error: " + receiverError.receiverInfo.lastError) - } - - override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { - println("Receiver Stopped: " + receiverStopped.receiverInfo.name) - println("Reason: " + receiverStopped.receiverInfo.lastError + " : " + receiverStopped.receiverInfo.lastErrorMessage) - } - - override def onBatchStarted(batchStarted: StreamingListenerBatchStarted){ - println("Batch started with " + batchStarted.batchInfo.numRecords + " records") - } - - override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted){ - println("Batch completed with " + batchCompleted.batchInfo.numRecords + " records"); - } -} \ No newline at end of file diff --git a/streaming-twitter/StreamingTwitter.scala b/streaming-twitter/StreamingTwitter.scala deleted file mode 100644 index 8ae62957..00000000 --- a/streaming-twitter/StreamingTwitter.scala +++ /dev/null @@ -1,310 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.cds.spark.samples - -import scala.collection.mutable._ -import org.apache.commons.lang3.StringEscapeUtils -import org.apache.log4j.Level -import org.apache.log4j.Logger -import org.apache.spark.Accumulator -import org.apache.spark.SparkConf -import org.apache.spark.SparkContext -import org.apache.spark.streaming._ -import org.apache.spark.streaming.dstream._ -import org.http4s._ -import org.http4s.Http4s._ -import org.http4s.Status._ -import org.http4s.client.Client -import org.http4s.client.blaze.PooledHttp1Client -import org.http4s.headers.Authorization -import com.ibm.couchdb._ -import scalaz._ -import scalaz.concurrent.Task -import twitter4j.Status -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.Row -import org.apache.spark.sql.types._ -import org.apache.spark.sql.DataFrame -import org.apache.spark.rdd.RDD -import org.apache.spark.rdd.EmptyRDD -import com.google.common.base.CharMatcher -import scala.math.BigDecimal -import com.ibm.cds.spark.samples.config.DemoConfig -import com.ibm.cds.spark.samples.ToneAnalyzer.ToneCategory -// import org.apache.spark.Logging - - - - -/** - * @author dtaieb - */ -object StreamingTwitter { - val log = Logger.getLogger(getClass.getName) - var ssc: StreamingContext = null - var sqlContext: SQLContext = null - var workingRDD: RDD[Row] = null - var schemaTweets : StructType = null - val logger: Logger = Logger.getLogger( "com.ibm.cds.spark.samples.StreamingTwitter" ) - - //main method invoked when running as a standalone Spark Application - def main(args: Array[String]) { - - val conf = new SparkConf().setAppName("Spark Streaming Twitter Demo") - val sc = new SparkContext(conf) - startTwitterStreaming(sc, Seconds(10)); - } - - //Hold configuration key/value pairs - val config = new DemoConfig - - //Wrapper api for Notebook access - def setConfig(key:String, value:String){ - config.setConfig(key, value) - } - - def startTwitterStreaming( sc: SparkContext, stopAfter: Duration = Seconds(0) ){ - println("Starting twitter stream"); - if ( ssc != null ){ - println("Twitter Stream already running"); - println("Please use stopTwitterStreaming() first and try again"); - return; - } - - if ( !config.validateConfiguration(DemoConfig.CHECKPOINT_DIR_KEY) ){ - println("Unable to validate config") - return; - } - - Logger.getLogger("org.apache.spark").setLevel(Level.OFF) - - workingRDD = sc.emptyRDD - //Broadcast the config to each worker node - val broadcastVar = sc.broadcast(config.toImmutableMap) - - var canStopTwitterStream = true - var batchesProcessed=0 - - ssc = new StreamingContext( sc, Seconds(5) ) - - ssc.addStreamingListener( new StreamingListener ) - - try{ - sqlContext = new SQLContext(sc) - val keys = config.getConfig("tweets.key").split(","); - val stream = org.apache.spark.streaming.twitter.TwitterUtils.createStream( ssc, None ); - - if ( schemaTweets == null ){ - val schemaString = "author userid date lang text lat:double long:double" - schemaTweets = - StructType( - schemaString.split(" ").map( - fieldName => { - val ar = fieldName.split(":") - StructField( - ar.lift(0).get, - ar.lift(1).getOrElse("string") match{ - case "int" => IntegerType - case "double" => DoubleType - case _ => StringType - }, - true) - } - ).union( - ToneAnalyzer.sentimentFactors.map( f => StructField( f._1, DoubleType )).toArray[StructField] - ) - ) - } - val tweets = stream.filter { status => - Option(status.getUser).flatMap[String] { - u => Option(u.getLang) - }.getOrElse("").startsWith("en") && CharMatcher.ASCII.matchesAllOf(status.getText) && ( keys.isEmpty || keys.exists{status.getText.contains(_)}) - } - - lazy val client = PooledHttp1Client() - val rowTweets = tweets.map(status=> { - val sentiment = ToneAnalyzer.computeSentiment( client, status, broadcastVar ) - - var colValues = Array[Any]( - status.getUser.getName, //author - status.getUser.getScreenName, //Userid - status.getCreatedAt.toString, //date - status.getUser.getLang, //Lang - status.getText, //text - Option(status.getGeoLocation).map{ _.getLatitude}.getOrElse(0.0), //lat - Option(status.getGeoLocation).map{_.getLongitude}.getOrElse(0.0) //long - //exception - ) - - var scoreMap : Map[String, Double] = Map() - if ( sentiment != null ){ - // for( toneCategory <- Option(sentiment.tone_categories).getOrElse( Seq() )){ - for ( tone <- Option( sentiment.tones ).getOrElse( Seq() ) ){ - scoreMap.put( tone.tone_id, tone.score ) - } - // } - } - - colValues = colValues ++ ToneAnalyzer.sentimentFactors.map { f => (BigDecimal(scoreMap.get(f._2).getOrElse(0.0)).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble) * 100.0 } - //Return [Row, (sentiment, status)] - (Row(colValues.toArray:_*),(sentiment, status)) - }) - - rowTweets.foreachRDD( rdd => { - if(batchesProcessed==0){ - canStopTwitterStream=false - } - try{ - if( rdd.count > 0 ){ - batchesProcessed += 1 - workingRDD = sc.parallelize( rdd.map( t => t._1 ).collect()).union( workingRDD ) - - val saveToCloudant = broadcastVar.value.get("cloudant.save").get.toBoolean - if ( saveToCloudant ){ - rdd.foreachPartition { iterator => - var db: CouchDbApi = null; - val couch = CouchDb( broadcastVar.value.get("cloudant.hostName").get, - broadcastVar.value.get("cloudant.port").get.toInt, - broadcastVar.value.get("cloudant.https").get.toBoolean, - broadcastVar.value.get("cloudant.username").get, - broadcastVar.value.get("cloudant.password").get - ); - val dbName = "spark-streaming-twitter" - couch.dbs.get(dbName).attemptRun match{ - case -\/(e) => logger.trace("Couch Database does not exist, creating it now"); couch.dbs.create(dbName).run - case \/-(a) => println("Connected to cloudant db " + dbName ) - } - val typeMapping = TypeMapping(classOf[ToneAnalyzer.Tweet] -> "Tweet") - db = couch.db(dbName, typeMapping) - iterator.foreach( t => { - saveTweetToCloudant( client, db, t._2._2, t._2._1 ) - } - ) - } - } - } - }catch{ - case e: InterruptedException=>//Ignore - // case e: Exception => logError(e.getMessage, e ) - case e: Exception => log.error(e.getMessage, e ) - }finally{ - canStopTwitterStream = true - } - }) - - }catch{ - // case e : Exception => logError(e.getMessage, e ) - case e : Exception => log.error(e.getMessage, e ) - return - } - ssc.start() - - println("Twitter stream started"); - println("Tweets are collected real-time and analyzed") - println("To stop the streaming and start interacting with the data use: StreamingTwitter.stopTwitterStreaming") - - if ( !stopAfter.isZero ){ - //Automatically stop it after 10s - new Thread( new Runnable { - var displayMessage = true; - def run(){ - Thread.sleep( stopAfter.milliseconds ) - var loop = true - while(loop){ - if (canStopTwitterStream){ - stopTwitterStreaming - loop = false - }else{ - if ( displayMessage ){ - displayMessage = false - println("Received directive to stop twitter Stream: Waiting for already received tweets to be processed...") - } - Thread.sleep(5000L) - } - } - } - }).start - } - } - - def saveTweetToCloudant(client: Client, db: CouchDbApi, status:Status, sentiment: ToneAnalyzer.ToneCategory) : Status = { - if ( db != null){ - logger.trace("Creating new Tweet in Couch Database " + status.getText()) - val task:Task[Res.DocOk] = db.docs.create( - ToneAnalyzer.Tweet( - status.getUser().getName, - status.getCreatedAt().toString(), - status.getUser().getLang(), - status.getText(), - ToneAnalyzer.Geo( - Option(status.getGeoLocation).map{ _.getLatitude}.getOrElse(0.0), - Option(status.getGeoLocation).map{_.getLongitude}.getOrElse(0.0) - ), - sentiment - ) - ) - - // Execute the actions and process the result - task.attemptRun match { - // case -\/(e) => logError(e.getMessage, e ); - case -\/(e) => log.error(e.getMessage, e ); - case \/-(a) => logger.trace("Successfully create new Tweet in Couch Database " + status.getText() ) - } - } - - status - } - - def createTwitterDataFrames(sc: SparkContext) : (SQLContext, DataFrame) = { - if ( workingRDD.count <= 0 ){ - println("No data receive. Please start the Twitter stream again to collect data") - return null - } - - try{ - val df = sqlContext.createDataFrame( workingRDD, schemaTweets ) - df.registerTempTable("tweets") - - println("A new table named tweets with " + df.count() + " records has been correctly created and can be accessed through the SQLContext variable") - println("Here's the schema for tweets") - df.printSchema() - - (sqlContext, df) - }catch{ - // case e: Exception => {logError(e.getMessage, e ); return null} - case e: Exception => {log.error(e.getMessage, e ); return null} - } - } - - def stopTwitterStreaming(){ - if ( ssc == null){ - println("No Twitter stream to stop"); - return; - } - - println("Stopping Twitter stream. Please wait this may take a while") - ssc.stop(stopSparkContext = false, stopGracefully = false) - ssc = null - println("Twitter stream stopped"); - - println( "You can now create a sqlContext and DataFrame with " + workingRDD.count + " Tweets created. Sample usage: ") - println("val (sqlContext, df) = com.ibm.cds.spark.samples.StreamingTwitter.createTwitterDataFrames(sc)") - println("df.printSchema") - println("sqlContext.sql(\"select author, text from tweets\").show") - } -} diff --git a/streaming-twitter/ToneAnalyzer.scala b/streaming-twitter/ToneAnalyzer.scala deleted file mode 100644 index ad43f3b7..00000000 --- a/streaming-twitter/ToneAnalyzer.scala +++ /dev/null @@ -1,100 +0,0 @@ -package com.ibm.cds.spark.samples - -import org.http4s.EntityEncoder -import org.http4s.Uri -import org.http4s.client.Client -import org.http4s.Request -import org.http4s.BasicCredentials -import org.http4s.Header -import org.http4s.Headers -import org.http4s.Method -import org.http4s.headers.Authorization -import org.apache.log4j.Logger -import org.apache.spark.broadcast.Broadcast -// import org.apache.spark.Logging -import org.apache.log4j.Logger -// import scala.util.parsing.json.JSON -// import org.codehaus.jettison.json.JSONObject -import org.json.JSONObject -import scala.util.parsing.json._ - -/** - * @author dtaieb - */ - -object ToneAnalyzer { - val log = Logger.getLogger(getClass.getName) - - val sentimentFactors = Array( - ("Anger","anger"), - ("Disgust","disgust"), - ("Fear","fear"), - ("Joy","joy"), - ("Sadness","sadness"), - ("Analytical","analytical"), - ("Confident","confident"), - ("Tentative","tentative"), - ("Openness","openness_big5"), - ("Conscientiousness","conscientiousness_big5"), - ("Extraversion","extraversion_big5"), - ("Agreeableness","agreeableness_big5"), - ("EmotionalRange","neuroticism_big5") - ) - - //Class models for Sentiment JSON - // case class DocumentTone( document_tone: Sentiment ) - // case class Sentiment(tone_categories: Seq[ToneCategory]); - // case class ToneCategory(category_id: String, category_name: String, tones: Seq[Tone]); - case class DocumentTone(document_tone: ToneCategory) - case class ToneCategory(tones: Seq[Tone]); - case class Tone(score: Double, tone_id: String, tone_name: String) -// case class Sentiment( scorecard: String, children: Seq[Tone] ) -// case class Tone( name: String, id: String, children: Seq[ToneResult]) -// case class ToneResult(name: String, id: String, word_count: Double, normalized_score: Double, raw_score: Double, linguistic_evidence: Seq[LinguisticEvidence] ) -// case class LinguisticEvidence( evidence_score: Double, word_count: Double, correlation: String, words : Seq[String]) - - case class Geo( lat: Double, long: Double ) - case class Tweet(author: String, date: String, language: String, text: String, geo : Geo, sentiment : ToneCategory) - - def computeSentiment( client: Client, status:StatusAdapter, broadcastVar: Broadcast[Map[String,String]] ) : ToneCategory = { - // logTrace("Calling sentiment from Watson Tone Analyzer: " + status.text) - log.trace("Calling sentiment from Watson Tone Analyzer: " + status.text) - try{ - //Get Sentiment on the tweet - val sentimentResults: String = - EntityEncoder[String].toEntity("{\"text\": " + JSONObject.quote( status.text ) + "}" ).flatMap { - entity => - val s = broadcastVar.value.get("watson.tone.url").get + "/v3/tone?version=" + broadcastVar.value.get("watson.api.version").get - val toneuri: Uri = Uri.fromString( s ).getOrElse( null ) - client( - Request( - method = Method.POST, - uri = toneuri, - headers = Headers( - Authorization( - BasicCredentials(broadcastVar.value.get("watson.tone.username").get, broadcastVar.value.get("watson.tone.password").get) - ), - Header("Accept", "application/json"), - Header("Content-Type", "application/json; charset=utf-8") - ), - body = entity.body - ) - ).flatMap { response => - if (response.status.code == 200 ) { - response.as[String] - } else { - println( "Error received from Watson Tone Analyzer. Code : " + response.status.code + " reason: " + response.status.reason ) - null - } - } - }.run - - upickle.read[DocumentTone](sentimentResults).document_tone - }catch{ - case e:Throwable => { - e.printStackTrace() - null - } - } - } -} diff --git a/streaming-twitter/TwitterAdapter.scala b/streaming-twitter/TwitterAdapter.scala deleted file mode 100644 index 221ae278..00000000 --- a/streaming-twitter/TwitterAdapter.scala +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ibm.cds.spark.samples - -import java.io.ObjectInputStream -import java.io.ByteArrayInputStream -import scala.util.parsing.json.JSON -import org.apache.kafka.common.serialization.Deserializer -import twitter4j.Status - -/** - * @author dtaieb - * Deserialization adapters for Twitter4J Status - */ - -case class StatusAdapter(userName:String, userId: String, userLang: String,createdAt:String,text:String, long:Double, lat:Double); - -object StatusAdapter{ - implicit def statusAdapterWrapper(status: Status) = - StatusAdapter( - status.getUser.getName, - status.getUser.getScreenName, - status.getUser.getLang, - status.getCreatedAt.toString, - status.getText, - Option(status.getGeoLocation).map{ _.getLongitude}.getOrElse(0.0), - Option(status.getGeoLocation).map{ _.getLatitude}.getOrElse(0.0) - ) -} - -class StatusDeserializer extends Deserializer[StatusAdapter]{ - def configure( props: java.util.Map[String, _], isKey: Boolean) = { - - } - - def close(){ - - } - - def deserialize(topic: String, data: Array[Byte] ): StatusAdapter = { - try{ - val bais = new ByteArrayInputStream( data ) - var ois:ObjectInputStream = null - try{ - ois = new ObjectInputStream( bais ) - ois.readObject().asInstanceOf[Status] - }finally{ - if (bais != null ){ - bais.close - } - if ( ois != null ){ - ois.close - } - } - }catch{ - case e:Throwable=>{ - val jsonObject = JSON.parseFull( new String(data) ).getOrElse(Map.empty).asInstanceOf[Map[String, Any]] - val user=jsonObject.get("user").getOrElse( Map.empty ).asInstanceOf[Map[String,Any]] - val geo = Option(jsonObject.get("geo").orNull).getOrElse(Map.empty).asInstanceOf[Map[String,Any]] - StatusAdapter( - user.get("name").getOrElse("").asInstanceOf[String], - user.get("userid").getOrElse("").asInstanceOf[String], - user.get("lang").getOrElse("").asInstanceOf[String], - jsonObject.get("created_at").getOrElse("").asInstanceOf[String], - jsonObject.get("text").getOrElse("").asInstanceOf[String], - geo.get("long").getOrElse(0.0).asInstanceOf[Double], - geo.get("lat").getOrElse(0.0).asInstanceOf[Double] - ) - } - } - } -} \ No newline at end of file diff --git a/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/KafkaProducerTest.scala b/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/KafkaProducerTest.scala index 46303385..ea531b86 100644 --- a/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/KafkaProducerTest.scala +++ b/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/KafkaProducerTest.scala @@ -36,20 +36,22 @@ import twitter4j.Status import twitter4j.StatusDeletionNotice import twitter4j.StatusListener import twitter4j.TwitterStreamFactory +import twitter4j.TwitterStream import scala.util.parsing.json.JSON import java.io.InputStream -import twitter4j.TwitterStream import com.ibm.cds.spark.samples.config.DemoConfig -import org.apache.spark.Logging +// import org.apache.spark.Logging +import org.apache.log4j.Logger /** * @author dtaieb */ -object KafkaProducerTest extends Logging{ +object KafkaProducerTest { //Very verbose, enable only if necessary //Logger.getLogger("org.apache.kafka").setLevel(Level.ALL) //Logger.getLogger("kafka").setLevel(Level.ALL) + val log = Logger.getLogger(getClass.getName) var twitterStream : TwitterStream = _; @@ -79,11 +81,13 @@ object KafkaProducerTest extends Logging{ def onStatus(status: Status){ if ( lastSent == 0 || System.currentTimeMillis() - lastSent > 200L){ lastSent = System.currentTimeMillis() - logInfo("Got a status " + status.getText ) + // logInfo("Got a status " + status.getText ) + log.info("Got a status " + status.getText ) val producerRecord = new ProducerRecord(kafkaProps.getConfig(MessageHubConfig.KAFKA_TOPIC_TWEETS ), "tweet", status ) try{ val metadata = kafkaProducer.send( producerRecord ).get(2000, TimeUnit.SECONDS); - logInfo("Successfully sent record: Topic: " + metadata.topic + " Offset: " + metadata.offset ) + // logInfo("Successfully sent record: Topic: " + metadata.topic + " Offset: " + metadata.offset ) + log.info("Successfully sent record: Topic: " + metadata.topic + " Offset: " + metadata.offset ) }catch{ case e:Throwable => e.printStackTrace } @@ -98,7 +102,8 @@ object KafkaProducerTest extends Logging{ def onException( e: Exception){ println("Unexpected error from twitterStream: " + e.getMessage); - logError(e.getMessage, e) + // logError(e.getMessage, e) + log.error(e.getMessage, e) } def onScrubGeo(lat: Long, long: Long ){ @@ -149,4 +154,4 @@ object KafkaConsumerTest { } }).start } -} \ No newline at end of file +} diff --git a/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/MessageHubStreamingTwitter.scala b/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/MessageHubStreamingTwitter.scala index 18ca53de..1dfa086e 100644 --- a/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/MessageHubStreamingTwitter.scala +++ b/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/MessageHubStreamingTwitter.scala @@ -50,14 +50,17 @@ import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted import org.apache.spark.broadcast.Broadcast -import org.apache.spark.Logging +// import org.apache.spark.Logging +import org.apache.log4j.Logger + import java.util.Arrays /** * @author dtaieb * Twitter+Watson sample app with MessageHub/Kafka */ -object MessageHubStreamingTwitter extends Logging{ +object MessageHubStreamingTwitter { + val log = Logger.getLogger(getClass.getName) var ssc: StreamingContext = null val reuseCheckpoint = false; @@ -70,7 +73,7 @@ object MessageHubStreamingTwitter extends Logging{ //Logger.getLogger("org.apache.kafka").setLevel(Level.ALL) //Logger.getLogger("kafka").setLevel(Level.ALL) - Logger.getLogger("org.apache.spark").setLevel(Level.WARN) + // Logger.getLogger("org.apache.spark").setLevel(Level.WARN) def main(args: Array[String]): Unit = { println("Printing arguments: "); @@ -151,10 +154,12 @@ object MessageHubStreamingTwitter extends Logging{ if ( task != null ){ val producerRecord = new ProducerRecord[String,String](task._1, "tweet", task._2 ) val metadata = kafkaProducer.send( producerRecord ).get; - logInfo("Sent record " + metadata.offset() + " Topic " + task._1) + // logInfo("Sent record " + metadata.offset() + " Topic " + task._1) + log.info("Sent record " + metadata.offset() + " Topic " + task._1) } }catch{ - case e:Throwable => logError(e.getMessage, e) + // case e:Throwable => logError(e.getMessage, e) + case e:Throwable => log.error(e.getMessage, e) } } queue.synchronized{ @@ -206,11 +211,11 @@ object MessageHubStreamingTwitter extends Logging{ val sentiment = ToneAnalyzer.computeSentiment( client, status, broadcastVar ) var scoreMap : Map[String, Double] = Map() if ( sentiment != null ){ - for( toneCategory <- Option(sentiment.tone_categories).getOrElse( Seq() )){ - for ( tone <- Option( toneCategory.tones ).getOrElse( Seq() ) ){ + // for( toneCategory <- Option(sentiment.tone_categories).getOrElse( Seq() )){ + for ( tone <- Option( sentiment.tones ).getOrElse( Seq() ) ){ scoreMap.put( tone.tone_id, (BigDecimal(tone.score).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble) * 100.0 ) } - } + //} } EnrichedTweet( @@ -240,7 +245,8 @@ object MessageHubStreamingTwitter extends Logging{ try{ queue.notify }catch{ - case e:Throwable=>logError(e.getMessage, e) + // case e:Throwable=>logError(e.getMessage, e) + case e:Throwable=>log.error(e.getMessage, e) } } }) @@ -302,7 +308,8 @@ object MessageHubStreamingTwitter extends Logging{ try{ queue.notify }catch{ - case e:Throwable=>logError(e.getMessage, e) + // case e:Throwable=>logError(e.getMessage, e) + case e:Throwable=>log.error(e.getMessage, e) } } } @@ -322,7 +329,8 @@ object MessageHubStreamingTwitter extends Logging{ } } -object TweetsMetricJsonSerializer extends Logging{ +object TweetsMetricJsonSerializer { + val log = Logger.getLogger(getClass.getName) def serialize(value: Seq[(String,Long)] ): String = { val sb = new StringBuilder("[") var comma = "" @@ -331,12 +339,14 @@ object TweetsMetricJsonSerializer extends Logging{ comma="," }) sb.append("]") - logInfo("Serialized json: " + sb) + // logInfo("Serialized json: " + sb) + log.info("Serialized json: " + sb) sb.toString() } } -object ToneScoreJsonSerializer extends Logging{ +object ToneScoreJsonSerializer { + val log = Logger.getLogger(getClass.getName) def serializeList[U:ClassTag]( label: String, value: List[U] ):String = { val sb = new StringBuilder("[\"" + label.replaceAll("\"", "") + "\"") value.foreach { item => { @@ -364,7 +374,8 @@ object ToneScoreJsonSerializer extends Logging{ comma="," }) sb.append("]") - logInfo("Serialized size: " + value.size + ". Tone json: " + sb) + // logInfo("Serialized size: " + value.size + ". Tone json: " + sb) + log.info("Serialized size: " + value.size + ". Tone json: " + sb) sb.toString() } -} \ No newline at end of file +} diff --git a/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/PixiedustStreamingTwitter.scala b/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/PixiedustStreamingTwitter.scala index 7f3fdcb2..7a87df10 100644 --- a/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/PixiedustStreamingTwitter.scala +++ b/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/PixiedustStreamingTwitter.scala @@ -19,7 +19,7 @@ package com.ibm.cds.spark.samples import scala.collection.mutable._ import com.ibm.pixiedust.ChannelReceiver -import org.apache.spark.Logging +// import org.apache.spark.Logging import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.SparkContext @@ -49,7 +49,8 @@ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.broadcast.Broadcast import org.apache.spark.HashPartitioner import twitter4j.Status -import org.codehaus.jettison.json.JSONObject +// import org.codehaus.jettison.json.JSONObject +import org.json.JSONObject import org.apache.spark.AccumulableParam import org.apache.spark.streaming.StreamingContextState import org.apache.spark.sql.DataFrame @@ -57,7 +58,7 @@ import org.apache.spark.sql.DataFrame /* @author dtaieb * Twitter+Watson sentiment analysis app powered by Pixiedust */ -object PixiedustStreamingTwitter extends ChannelReceiver() with Logging{ +object PixiedustStreamingTwitter extends ChannelReceiver() { var ssc: StreamingContext = null var workingRDD: RDD[Row] = null //Hold configuration key/value pairs @@ -222,11 +223,11 @@ object PixiedustStreamingTwitter extends ChannelReceiver() with Logging{ val sentiment = ToneAnalyzer.computeSentiment( client, status, broadcastVar ) var scoreMap : Map[String, Double] = Map() if ( sentiment != null ){ - for( toneCategory <- Option(sentiment.tone_categories).getOrElse( Seq() )){ - for ( tone <- Option( toneCategory.tones ).getOrElse( Seq() ) ){ + // for( toneCategory <- Option(sentiment.tone_categories).getOrElse( Seq() )){ + for ( tone <- Option( sentiment.tones ).getOrElse( Seq() ) ){ scoreMap.put( tone.tone_id, (BigDecimal(tone.score).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble) * 100.0 ) } - } + // } } var jsonSentiment="{"; @@ -342,4 +343,4 @@ object TweetsAccumulatorParam extends AccumulableParam[Array[(String,String)], ( def addAccumulator(current:Array[(String,String)], s:(String,String)):Array[(String,String)] = { current :+ s } -} \ No newline at end of file +} diff --git a/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/StreamingTwitter.scala b/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/StreamingTwitter.scala index 7410f991..8ae62957 100644 --- a/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/StreamingTwitter.scala +++ b/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/StreamingTwitter.scala @@ -46,7 +46,7 @@ import com.google.common.base.CharMatcher import scala.math.BigDecimal import com.ibm.cds.spark.samples.config.DemoConfig import com.ibm.cds.spark.samples.ToneAnalyzer.ToneCategory -import org.apache.spark.Logging +// import org.apache.spark.Logging @@ -54,7 +54,8 @@ import org.apache.spark.Logging /** * @author dtaieb */ -object StreamingTwitter extends Logging{ +object StreamingTwitter { + val log = Logger.getLogger(getClass.getName) var ssc: StreamingContext = null var sqlContext: SQLContext = null var workingRDD: RDD[Row] = null @@ -152,11 +153,11 @@ object StreamingTwitter extends Logging{ var scoreMap : Map[String, Double] = Map() if ( sentiment != null ){ - for( toneCategory <- Option(sentiment.tone_categories).getOrElse( Seq() )){ - for ( tone <- Option( toneCategory.tones ).getOrElse( Seq() ) ){ + // for( toneCategory <- Option(sentiment.tone_categories).getOrElse( Seq() )){ + for ( tone <- Option( sentiment.tones ).getOrElse( Seq() ) ){ scoreMap.put( tone.tone_id, tone.score ) } - } + // } } colValues = colValues ++ ToneAnalyzer.sentimentFactors.map { f => (BigDecimal(scoreMap.get(f._2).getOrElse(0.0)).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble) * 100.0 } @@ -199,14 +200,16 @@ object StreamingTwitter extends Logging{ } }catch{ case e: InterruptedException=>//Ignore - case e: Exception => logError(e.getMessage, e ) + // case e: Exception => logError(e.getMessage, e ) + case e: Exception => log.error(e.getMessage, e ) }finally{ canStopTwitterStream = true } }) }catch{ - case e : Exception => logError(e.getMessage, e ) + // case e : Exception => logError(e.getMessage, e ) + case e : Exception => log.error(e.getMessage, e ) return } ssc.start() @@ -239,7 +242,7 @@ object StreamingTwitter extends Logging{ } } - def saveTweetToCloudant(client: Client, db: CouchDbApi, status:Status, sentiment: ToneAnalyzer.Sentiment) : Status = { + def saveTweetToCloudant(client: Client, db: CouchDbApi, status:Status, sentiment: ToneAnalyzer.ToneCategory) : Status = { if ( db != null){ logger.trace("Creating new Tweet in Couch Database " + status.getText()) val task:Task[Res.DocOk] = db.docs.create( @@ -258,7 +261,8 @@ object StreamingTwitter extends Logging{ // Execute the actions and process the result task.attemptRun match { - case -\/(e) => logError(e.getMessage, e ); + // case -\/(e) => logError(e.getMessage, e ); + case -\/(e) => log.error(e.getMessage, e ); case \/-(a) => logger.trace("Successfully create new Tweet in Couch Database " + status.getText() ) } } @@ -282,7 +286,8 @@ object StreamingTwitter extends Logging{ (sqlContext, df) }catch{ - case e: Exception => {logError(e.getMessage, e ); return null} + // case e: Exception => {logError(e.getMessage, e ); return null} + case e: Exception => {log.error(e.getMessage, e ); return null} } } @@ -302,4 +307,4 @@ object StreamingTwitter extends Logging{ println("df.printSchema") println("sqlContext.sql(\"select author, text from tweets\").show") } -} \ No newline at end of file +} diff --git a/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/ToneAnalyzer.scala b/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/ToneAnalyzer.scala index 63504487..ad43f3b7 100644 --- a/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/ToneAnalyzer.scala +++ b/streaming-twitter/src/main/scala/com/ibm/cds/spark/samples/ToneAnalyzer.scala @@ -11,15 +11,19 @@ import org.http4s.Method import org.http4s.headers.Authorization import org.apache.log4j.Logger import org.apache.spark.broadcast.Broadcast -import org.apache.spark.Logging -import scala.util.parsing.json.JSON -import org.codehaus.jettison.json.JSONObject +// import org.apache.spark.Logging +import org.apache.log4j.Logger +// import scala.util.parsing.json.JSON +// import org.codehaus.jettison.json.JSONObject +import org.json.JSONObject +import scala.util.parsing.json._ /** * @author dtaieb */ -object ToneAnalyzer extends Logging{ +object ToneAnalyzer { + val log = Logger.getLogger(getClass.getName) val sentimentFactors = Array( ("Anger","anger"), @@ -38,9 +42,11 @@ object ToneAnalyzer extends Logging{ ) //Class models for Sentiment JSON - case class DocumentTone( document_tone: Sentiment ) - case class Sentiment(tone_categories: Seq[ToneCategory]); - case class ToneCategory(category_id: String, category_name: String, tones: Seq[Tone]); + // case class DocumentTone( document_tone: Sentiment ) + // case class Sentiment(tone_categories: Seq[ToneCategory]); + // case class ToneCategory(category_id: String, category_name: String, tones: Seq[Tone]); + case class DocumentTone(document_tone: ToneCategory) + case class ToneCategory(tones: Seq[Tone]); case class Tone(score: Double, tone_id: String, tone_name: String) // case class Sentiment( scorecard: String, children: Seq[Tone] ) // case class Tone( name: String, id: String, children: Seq[ToneResult]) @@ -48,10 +54,11 @@ object ToneAnalyzer extends Logging{ // case class LinguisticEvidence( evidence_score: Double, word_count: Double, correlation: String, words : Seq[String]) case class Geo( lat: Double, long: Double ) - case class Tweet(author: String, date: String, language: String, text: String, geo : Geo, sentiment : Sentiment ) + case class Tweet(author: String, date: String, language: String, text: String, geo : Geo, sentiment : ToneCategory) - def computeSentiment( client: Client, status:StatusAdapter, broadcastVar: Broadcast[Map[String,String]] ) : Sentiment = { - logTrace("Calling sentiment from Watson Tone Analyzer: " + status.text) + def computeSentiment( client: Client, status:StatusAdapter, broadcastVar: Broadcast[Map[String,String]] ) : ToneCategory = { + // logTrace("Calling sentiment from Watson Tone Analyzer: " + status.text) + log.trace("Calling sentiment from Watson Tone Analyzer: " + status.text) try{ //Get Sentiment on the tweet val sentimentResults: String = @@ -90,4 +97,4 @@ object ToneAnalyzer extends Logging{ } } } -} \ No newline at end of file +}