Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hotfix/update2new dsx tone analyzer #5

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions streaming-twitter/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,36 @@ name := "streaming-twitter"

version := "1.6"

scalaVersion := "2.10.4"
scalaVersion := "2.11.4"

libraryDependencies ++= {
val sparkVersion = "1.6.0"
val sparkVersion = "2.1.1"
Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
"org.apache.spark" %% "spark-streaming-twitter" % sparkVersion,
// "org.apache.spark" %% "spark-streaming-twitter" % sparkVersion,
"org.apache.bahir" %% "spark-streaming-twitter" % sparkVersion,
"org.apache.spark" %% "spark-repl" % sparkVersion % "provided",
"com.ibm" %% "couchdb-scala" % "0.5.3",
"com.ibm" %% "couchdb-scala" % "0.5.1",
"org.apache.kafka" % "kafka-log4j-appender" % "0.9.0.0",
"org.apache.kafka" % "kafka-clients" % "0.9.0.0",
"org.apache.kafka" %% "kafka" % "0.9.0.0",
"com.google.guava" % "guava" % "14.0.1"
"com.google.guava" % "guava" % "14.0.1",
"org.twitter4j" % "twitter4j-core" % "4.0.4",
"org.twitter4j" % "twitter4j-stream" % "4.0.4",
"org.json" % "json" % "20180130"
)
}

assemblyMergeStrategy in assembly := {
case PathList("org", "apache", "spark", xs @ _*) => MergeStrategy.first
case PathList("scala", xs @ _*) => MergeStrategy.discard
case PathList("com", "ibm", "pixiedust", xs @ _*) => MergeStrategy.discard
case PathList("org", "apache", "spark", xs @ _*) => MergeStrategy.first
case PathList("org", "apache", "hadoop", xs @ _*) => MergeStrategy.first
case PathList("com", "google", xs @ _*) => MergeStrategy.first
case PathList("org", "apache", xs @ _*) => MergeStrategy.first
case PathList("javax", "xml", xs @ _*) => MergeStrategy.first
case PathList("scala", xs @ _*) => MergeStrategy.discard
case PathList("com", "ibm", "pixiedust", xs @ _*) => MergeStrategy.discard
case PathList("META-INF", "maven", "org.slf4j", xs @ _* ) => MergeStrategy.first
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
Expand All @@ -32,6 +40,7 @@ assemblyMergeStrategy in assembly := {

unmanagedBase <<= baseDirectory { base => base / "lib" }

//Important line below. This strips out all the scala dependencies and shrinks down your jar into skinny jar
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

resolvers += "scalaz-bintray" at "https://dl.bintray.com/scalaz/releases"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,22 @@ import twitter4j.Status
import twitter4j.StatusDeletionNotice
import twitter4j.StatusListener
import twitter4j.TwitterStreamFactory
import twitter4j.TwitterStream
import scala.util.parsing.json.JSON
import java.io.InputStream
import twitter4j.TwitterStream
import com.ibm.cds.spark.samples.config.DemoConfig
import org.apache.spark.Logging
// import org.apache.spark.Logging
import org.apache.log4j.Logger


/**
* @author dtaieb
*/
object KafkaProducerTest extends Logging{
object KafkaProducerTest {
//Very verbose, enable only if necessary
//Logger.getLogger("org.apache.kafka").setLevel(Level.ALL)
//Logger.getLogger("kafka").setLevel(Level.ALL)
val log = Logger.getLogger(getClass.getName)

var twitterStream : TwitterStream = _;

Expand Down Expand Up @@ -79,11 +81,13 @@ object KafkaProducerTest extends Logging{
def onStatus(status: Status){
if ( lastSent == 0 || System.currentTimeMillis() - lastSent > 200L){
lastSent = System.currentTimeMillis()
logInfo("Got a status " + status.getText )
// logInfo("Got a status " + status.getText )
log.info("Got a status " + status.getText )
val producerRecord = new ProducerRecord(kafkaProps.getConfig(MessageHubConfig.KAFKA_TOPIC_TWEETS ), "tweet", status )
try{
val metadata = kafkaProducer.send( producerRecord ).get(2000, TimeUnit.SECONDS);
logInfo("Successfully sent record: Topic: " + metadata.topic + " Offset: " + metadata.offset )
// logInfo("Successfully sent record: Topic: " + metadata.topic + " Offset: " + metadata.offset )
log.info("Successfully sent record: Topic: " + metadata.topic + " Offset: " + metadata.offset )
}catch{
case e:Throwable => e.printStackTrace
}
Expand All @@ -98,7 +102,8 @@ object KafkaProducerTest extends Logging{

def onException( e: Exception){
println("Unexpected error from twitterStream: " + e.getMessage);
logError(e.getMessage, e)
// logError(e.getMessage, e)
log.error(e.getMessage, e)
}

def onScrubGeo(lat: Long, long: Long ){
Expand Down Expand Up @@ -149,4 +154,4 @@ object KafkaConsumerTest {
}
}).start
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,17 @@ import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStopped
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverError
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.Logging
// import org.apache.spark.Logging
import org.apache.log4j.Logger

import java.util.Arrays

/**
* @author dtaieb
* Twitter+Watson sample app with MessageHub/Kafka
*/
object MessageHubStreamingTwitter extends Logging{
object MessageHubStreamingTwitter {
val log = Logger.getLogger(getClass.getName)

var ssc: StreamingContext = null
val reuseCheckpoint = false;
Expand All @@ -70,7 +73,7 @@ object MessageHubStreamingTwitter extends Logging{

//Logger.getLogger("org.apache.kafka").setLevel(Level.ALL)
//Logger.getLogger("kafka").setLevel(Level.ALL)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
// Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

def main(args: Array[String]): Unit = {
println("Printing arguments: ");
Expand Down Expand Up @@ -151,10 +154,12 @@ object MessageHubStreamingTwitter extends Logging{
if ( task != null ){
val producerRecord = new ProducerRecord[String,String](task._1, "tweet", task._2 )
val metadata = kafkaProducer.send( producerRecord ).get;
logInfo("Sent record " + metadata.offset() + " Topic " + task._1)
// logInfo("Sent record " + metadata.offset() + " Topic " + task._1)
log.info("Sent record " + metadata.offset() + " Topic " + task._1)
}
}catch{
case e:Throwable => logError(e.getMessage, e)
// case e:Throwable => logError(e.getMessage, e)
case e:Throwable => log.error(e.getMessage, e)
}
}
queue.synchronized{
Expand Down Expand Up @@ -206,11 +211,11 @@ object MessageHubStreamingTwitter extends Logging{
val sentiment = ToneAnalyzer.computeSentiment( client, status, broadcastVar )
var scoreMap : Map[String, Double] = Map()
if ( sentiment != null ){
for( toneCategory <- Option(sentiment.tone_categories).getOrElse( Seq() )){
for ( tone <- Option( toneCategory.tones ).getOrElse( Seq() ) ){
// for( toneCategory <- Option(sentiment.tone_categories).getOrElse( Seq() )){
for ( tone <- Option( sentiment.tones ).getOrElse( Seq() ) ){
scoreMap.put( tone.tone_id, (BigDecimal(tone.score).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble) * 100.0 )
}
}
//}
}

EnrichedTweet(
Expand Down Expand Up @@ -240,7 +245,8 @@ object MessageHubStreamingTwitter extends Logging{
try{
queue.notify
}catch{
case e:Throwable=>logError(e.getMessage, e)
// case e:Throwable=>logError(e.getMessage, e)
case e:Throwable=>log.error(e.getMessage, e)
}
}
})
Expand Down Expand Up @@ -302,7 +308,8 @@ object MessageHubStreamingTwitter extends Logging{
try{
queue.notify
}catch{
case e:Throwable=>logError(e.getMessage, e)
// case e:Throwable=>logError(e.getMessage, e)
case e:Throwable=>log.error(e.getMessage, e)
}
}
}
Expand All @@ -322,7 +329,8 @@ object MessageHubStreamingTwitter extends Logging{
}
}

object TweetsMetricJsonSerializer extends Logging{
object TweetsMetricJsonSerializer {
val log = Logger.getLogger(getClass.getName)
def serialize(value: Seq[(String,Long)] ): String = {
val sb = new StringBuilder("[")
var comma = ""
Expand All @@ -331,12 +339,14 @@ object TweetsMetricJsonSerializer extends Logging{
comma=","
})
sb.append("]")
logInfo("Serialized json: " + sb)
// logInfo("Serialized json: " + sb)
log.info("Serialized json: " + sb)
sb.toString()
}
}

object ToneScoreJsonSerializer extends Logging{
object ToneScoreJsonSerializer {
val log = Logger.getLogger(getClass.getName)
def serializeList[U:ClassTag]( label: String, value: List[U] ):String = {
val sb = new StringBuilder("[\"" + label.replaceAll("\"", "") + "\"")
value.foreach { item => {
Expand Down Expand Up @@ -364,7 +374,8 @@ object ToneScoreJsonSerializer extends Logging{
comma=","
})
sb.append("]")
logInfo("Serialized size: " + value.size + ". Tone json: " + sb)
// logInfo("Serialized size: " + value.size + ". Tone json: " + sb)
log.info("Serialized size: " + value.size + ". Tone json: " + sb)
sb.toString()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package com.ibm.cds.spark.samples

import scala.collection.mutable._
import com.ibm.pixiedust.ChannelReceiver
import org.apache.spark.Logging
// import org.apache.spark.Logging
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkContext
Expand Down Expand Up @@ -49,15 +49,16 @@ import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.HashPartitioner
import twitter4j.Status
import org.codehaus.jettison.json.JSONObject
// import org.codehaus.jettison.json.JSONObject
import org.json.JSONObject
import org.apache.spark.AccumulableParam
import org.apache.spark.streaming.StreamingContextState
import org.apache.spark.sql.DataFrame

/* @author dtaieb
* Twitter+Watson sentiment analysis app powered by Pixiedust
*/
object PixiedustStreamingTwitter extends ChannelReceiver() with Logging{
object PixiedustStreamingTwitter extends ChannelReceiver() {
var ssc: StreamingContext = null
var workingRDD: RDD[Row] = null
//Hold configuration key/value pairs
Expand Down Expand Up @@ -222,11 +223,11 @@ object PixiedustStreamingTwitter extends ChannelReceiver() with Logging{
val sentiment = ToneAnalyzer.computeSentiment( client, status, broadcastVar )
var scoreMap : Map[String, Double] = Map()
if ( sentiment != null ){
for( toneCategory <- Option(sentiment.tone_categories).getOrElse( Seq() )){
for ( tone <- Option( toneCategory.tones ).getOrElse( Seq() ) ){
// for( toneCategory <- Option(sentiment.tone_categories).getOrElse( Seq() )){
for ( tone <- Option( sentiment.tones ).getOrElse( Seq() ) ){
scoreMap.put( tone.tone_id, (BigDecimal(tone.score).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble) * 100.0 )
}
}
// }
}

var jsonSentiment="{";
Expand Down Expand Up @@ -342,4 +343,4 @@ object TweetsAccumulatorParam extends AccumulableParam[Array[(String,String)], (
def addAccumulator(current:Array[(String,String)], s:(String,String)):Array[(String,String)] = {
current :+ s
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,16 @@ import com.google.common.base.CharMatcher
import scala.math.BigDecimal
import com.ibm.cds.spark.samples.config.DemoConfig
import com.ibm.cds.spark.samples.ToneAnalyzer.ToneCategory
import org.apache.spark.Logging
// import org.apache.spark.Logging




/**
* @author dtaieb
*/
object StreamingTwitter extends Logging{
object StreamingTwitter {
val log = Logger.getLogger(getClass.getName)
var ssc: StreamingContext = null
var sqlContext: SQLContext = null
var workingRDD: RDD[Row] = null
Expand Down Expand Up @@ -152,11 +153,11 @@ object StreamingTwitter extends Logging{

var scoreMap : Map[String, Double] = Map()
if ( sentiment != null ){
for( toneCategory <- Option(sentiment.tone_categories).getOrElse( Seq() )){
for ( tone <- Option( toneCategory.tones ).getOrElse( Seq() ) ){
// for( toneCategory <- Option(sentiment.tone_categories).getOrElse( Seq() )){
for ( tone <- Option( sentiment.tones ).getOrElse( Seq() ) ){
scoreMap.put( tone.tone_id, tone.score )
}
}
// }
}

colValues = colValues ++ ToneAnalyzer.sentimentFactors.map { f => (BigDecimal(scoreMap.get(f._2).getOrElse(0.0)).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDouble) * 100.0 }
Expand Down Expand Up @@ -199,14 +200,16 @@ object StreamingTwitter extends Logging{
}
}catch{
case e: InterruptedException=>//Ignore
case e: Exception => logError(e.getMessage, e )
// case e: Exception => logError(e.getMessage, e )
case e: Exception => log.error(e.getMessage, e )
}finally{
canStopTwitterStream = true
}
})

}catch{
case e : Exception => logError(e.getMessage, e )
// case e : Exception => logError(e.getMessage, e )
case e : Exception => log.error(e.getMessage, e )
return
}
ssc.start()
Expand Down Expand Up @@ -239,7 +242,7 @@ object StreamingTwitter extends Logging{
}
}

def saveTweetToCloudant(client: Client, db: CouchDbApi, status:Status, sentiment: ToneAnalyzer.Sentiment) : Status = {
def saveTweetToCloudant(client: Client, db: CouchDbApi, status:Status, sentiment: ToneAnalyzer.ToneCategory) : Status = {
if ( db != null){
logger.trace("Creating new Tweet in Couch Database " + status.getText())
val task:Task[Res.DocOk] = db.docs.create(
Expand All @@ -258,7 +261,8 @@ object StreamingTwitter extends Logging{

// Execute the actions and process the result
task.attemptRun match {
case -\/(e) => logError(e.getMessage, e );
// case -\/(e) => logError(e.getMessage, e );
case -\/(e) => log.error(e.getMessage, e );
case \/-(a) => logger.trace("Successfully create new Tweet in Couch Database " + status.getText() )
}
}
Expand All @@ -282,7 +286,8 @@ object StreamingTwitter extends Logging{

(sqlContext, df)
}catch{
case e: Exception => {logError(e.getMessage, e ); return null}
// case e: Exception => {logError(e.getMessage, e ); return null}
case e: Exception => {log.error(e.getMessage, e ); return null}
}
}

Expand All @@ -302,4 +307,4 @@ object StreamingTwitter extends Logging{
println("df.printSchema")
println("sqlContext.sql(\"select author, text from tweets\").show")
}
}
}
Loading