Skip to content

Commit

Permalink
Switch from play-json to circe and cleanup build.sbt
Browse files Browse the repository at this point in the history
  • Loading branch information
pbernet committed Feb 2, 2024
1 parent 0981ee9 commit 6060edf
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 62 deletions.
33 changes: 17 additions & 16 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,22 @@ libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-slf4j" % pekkoVersion,

"org.apache.pekko" %% "pekko-http" % pekkoHTTPVersion,
// JSON (un)marshalling support for pekko-http
"org.apache.pekko" %% "pekko-http-spray-json" % pekkoHTTPVersion,
// JSON (un)marshalling in Java examples
"org.json" % "json" % "20231013",

// sttp wraps around akka-http to allow for concise clients
"io.circe" %% "circe-core" % "0.14.6",
"io.circe" %% "circe-generic" % "0.14.6",
"io.circe" %% "circe-parser" % "0.14.6",

// sttp wraps around pekko-http to allow for concise clients
"com.softwaremill.sttp.client3" %% "core" % sttpVersion,
"com.softwaremill.sttp.client3" %% "pekko-http-backend" % sttpVersion,

"org.apache.activemq" % "activemq-client" % activemqVersion exclude("com.fasterxml.jackson.core", "jackson-databind"),
"org.apache.activemq" % "activemq-broker" % activemqVersion exclude("com.fasterxml.jackson.core", "jackson-databind"),
"org.apache.activemq" % "activemq-kahadb-store" % activemqVersion exclude("com.fasterxml.jackson.core", "jackson-databind"),
"org.apache.activemq" % "activemq-client" % activemqVersion,
"org.apache.activemq" % "activemq-broker" % activemqVersion,
"org.apache.activemq" % "activemq-kahadb-store" % activemqVersion,
"org.apache.pekko" %% "pekko-connectors-jms" % pekkoConnectorVersion,
"javax.jms" % "jms" % "1.1",
"javax.xml.bind" % "jaxb-api" % "2.3.1",
Expand Down Expand Up @@ -70,12 +76,11 @@ libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-connectors-s3" % pekkoConnectorVersion,

"org.apache.pekko" %% "pekko-connectors-kinesis" % pekkoConnectorVersion,
// Use latest. Ref in alpakka: 2.17.113
"software.amazon.awssdk" % "kinesis" % awsClientVersion exclude("com.fasterxml.jackson.core", "jackson-databind"),
"software.amazon.awssdk" % "apache-client" % awsClientVersion exclude("com.fasterxml.jackson.core", "jackson-databind"),
"software.amazon.awssdk" % "kinesis" % awsClientVersion,
"software.amazon.awssdk" % "apache-client" % awsClientVersion,

"org.apache.pekko" %% "pekko-connectors-sqs" % pekkoConnectorVersion,
"software.amazon.awssdk" % "sqs" % awsClientVersion exclude("com.fasterxml.jackson.core", "jackson-databind"),
"software.amazon.awssdk" % "sqs" % awsClientVersion,

"org.squbs" %% "squbs-ext" % "0.15.0", // not (yet) migrated to pekko

Expand All @@ -93,9 +98,6 @@ libraryDependencies ++= Seq(

"com.crowdscriber.captions" %% "caption-parser" % "0.1.6",

"com.typesafe.play" %% "play-json" % "2.9.4",
"org.apache.pekko" %% "pekko-serialization-jackson" % pekkoVersion,

"org.apache.httpcomponents.client5" % "httpclient5" % "5.3.1",
"org.apache.httpcomponents.core5" % "httpcore5" % "5.2.4",
"commons-io" % "commons-io" % "2.11.0",
Expand All @@ -111,7 +113,7 @@ libraryDependencies ++= Seq(
"io.reactivex.rxjava3" % "rxjava" % "3.1.6",

"com.github.blemale" %% "scaffeine" % "5.2.1",
"ch.qos.logback" % "logback-classic" % "1.4.7",
"ch.qos.logback" % "logback-classic" % "1.4.12",

"org.testcontainers" % "testcontainers" % testContainersVersion,
"org.testcontainers" % "elasticsearch" % testContainersVersion,
Expand All @@ -133,10 +135,9 @@ libraryDependencies ++= Seq(
"org.junit.jupiter" % "junit-jupiter-engine" % "5.9.2" % Test,
"org.junit.jupiter" % "junit-jupiter-api" % "5.9.2" % Test,

// org.keycloak introduces com.fasterxml.jackson.core:jackson-core:2.12.1, which causes runtime ex
"org.keycloak" % "keycloak-core" % keycloakVersion exclude("com.fasterxml.jackson.core", "jackson-databind"),
"org.keycloak" % "keycloak-adapter-core" % keycloakVersion exclude("com.fasterxml.jackson.core", "jackson-databind"),
"org.keycloak" % "keycloak-admin-client" % keycloakVersion exclude("com.fasterxml.jackson.core", "jackson-databind"),
"org.keycloak" % "keycloak-core" % keycloakVersion,
"org.keycloak" % "keycloak-adapter-core" % keycloakVersion,
"org.keycloak" % "keycloak-admin-client" % keycloakVersion,
"org.jboss.spec.javax.ws.rs" % "jboss-jaxrs-api_2.1_spec" % "2.0.2.Final",

"org.postgresql" % "postgresql" % "42.6.0",
Expand Down
29 changes: 12 additions & 17 deletions src/main/scala/alpakka/sse/SSEClientWikipediaEdits.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package alpakka.sse

import io.circe._
import io.circe.parser._
import org.apache.pekko.NotUsed
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.Http
Expand All @@ -9,16 +11,13 @@ import org.apache.pekko.stream.connectors.sse.scaladsl.EventSource
import org.apache.pekko.stream.scaladsl.{Flow, Sink, Source}
import org.apache.pekko.stream.{Supervision, ThrottleMode}
import org.slf4j.{Logger, LoggerFactory}
import play.api.libs.json._

import java.time.{Instant, ZoneId}
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.sys.process._
import scala.util.Try
import scala.util.control.NonFatal


case class Change(timestamp: Long, serverName: String, user: String, cmdType: String, isBot: Boolean, isNamedBot: Boolean, lengthNew: Int = 0, lengthOld: Int = 0) {
override def toString = {
val localDateTime = Instant.ofEpochSecond(timestamp).atZone(ZoneId.systemDefault()).toLocalDateTime
Expand Down Expand Up @@ -69,26 +68,22 @@ object SSEClientWikipediaEdits extends App {
val parserFlow: Flow[ServerSentEvent, Change, NotUsed] = Flow[ServerSentEvent].map {
event: ServerSentEvent => {

def tryToInt(s: String) = Try(s.toInt).toOption.getOrElse(0)

def isNamedBot(bot: Boolean, user: String): Boolean = {
if (bot) user.toLowerCase().contains("bot") else false
}

val timestamp = (Json.parse(event.data) \ "timestamp").as[Long]

val serverName = (Json.parse(event.data) \ "server_name").as[String]

val user = (Json.parse(event.data) \ "user").as[String]

val cmdType = (Json.parse(event.data) \ "type").as[String]

val bot = (Json.parse(event.data) \ "bot").as[Boolean]
val cursor = parse(event.data).getOrElse(Json.Null).hcursor
val timestamp: Long = cursor.get[Long]("timestamp").toOption.getOrElse(0)
val serverName = cursor.get[String]("server_name").toOption.getOrElse("")
val user = cursor.get[String]("user").toOption.getOrElse("")
val cmdType = cursor.get[String]("type").toOption.getOrElse("")
val bot = cursor.get[Boolean]("bot").toOption.getOrElse(false)

if (cmdType == "new" || cmdType == "edit") {
val lengthNew = (Json.parse(event.data) \ "length" \ "new").getOrElse(JsString("0")).toString()
val lengthOld = (Json.parse(event.data) \ "length" \ "old").getOrElse(JsString("0")).toString()
Change(timestamp, serverName, user, cmdType, isBot = bot, isNamedBot = isNamedBot(bot, user), tryToInt(lengthNew), tryToInt(lengthOld))
val length = cursor.downField("length")
val lengthNew = length.get[Int]("new").toOption.getOrElse(0)
val lengthOld = length.get[Int]("old").toOption.getOrElse(0)
Change(timestamp, serverName, user, cmdType, isBot = bot, isNamedBot = isNamedBot(bot, user), lengthNew, lengthOld)
} else {
Change(timestamp, serverName, user, cmdType, isBot = bot, isNamedBot = isNamedBot(bot, user))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package alpakka.sse_to_elasticsearch

import io.circe._
import io.circe.generic.auto._
import io.circe.parser._
import opennlp.tools.namefind.{NameFinderME, TokenNameFinderModel}
import opennlp.tools.tokenize.{TokenizerME, TokenizerModel}
import opennlp.tools.util.Span
Expand All @@ -18,7 +21,6 @@ import org.apache.pekko.stream.{ActorAttributes, RestartSettings, Supervision}
import org.opensearch.testcontainers.OpensearchContainer
import org.slf4j.{Logger, LoggerFactory}
import org.testcontainers.utility.DockerImageName
import play.api.libs.json.{JsArray, JsString, Json}
import spray.json.DefaultJsonProtocol._
import spray.json.JsonFormat

Expand All @@ -29,7 +31,6 @@ import java.time.{Instant, ZoneId}
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.sys.process.{Process, stringSeqToProcess}
import scala.util.Try
import scala.util.control.NonFatal

/**
Expand Down Expand Up @@ -138,35 +139,29 @@ object SSEtoElasticsearch extends App {
val parserFlow: Flow[ServerSentEvent, Change, NotUsed] = Flow[ServerSentEvent].map {
event: ServerSentEvent => {

def tryToInt(s: String) = Try(s.toInt).toOption.getOrElse(0)

def isNamedBot(bot: Boolean, user: String): Boolean = {
if (bot) user.toLowerCase().contains("bot") else false
}

// We use the title as identifier
val title = (Json.parse(event.data) \ "title").as[String]

val timestamp = (Json.parse(event.data) \ "timestamp").as[Long]

val serverName = (Json.parse(event.data) \ "server_name").as[String]

val user = (Json.parse(event.data) \ "user").as[String]
val cursor = parse(event.data).getOrElse(Json.Null).hcursor

val cmdType = (Json.parse(event.data) \ "type").as[String]

val bot = (Json.parse(event.data) \ "bot").as[Boolean]
val titleAsID = cursor.get[String]("title").toOption.getOrElse("")
val timestamp: Long = cursor.get[Long]("timestamp").toOption.getOrElse(0)
val serverName = cursor.get[String]("server_name").toOption.getOrElse("")
val user = cursor.get[String]("user").toOption.getOrElse("")
val cmdType = cursor.get[String]("type").toOption.getOrElse("")
val bot = cursor.get[Boolean]("bot").toOption.getOrElse(false)

if (cmdType == "new" || cmdType == "edit") {
val lengthNew = (Json.parse(event.data) \ "length" \ "new").getOrElse(JsString("0")).toString()
val lengthOld = (Json.parse(event.data) \ "length" \ "old").getOrElse(JsString("0")).toString()
Change(timestamp, title, serverName, user, cmdType, isBot = bot, isNamedBot = isNamedBot(bot, user), tryToInt(lengthNew), tryToInt(lengthOld))
val length = cursor.downField("length")
val lengthNew = length.get[Int]("new").toOption.getOrElse(0)
val lengthOld = length.get[Int]("old").toOption.getOrElse(0)
Change(timestamp, titleAsID, serverName, user, cmdType, isBot = bot, isNamedBot = isNamedBot(bot, user), lengthNew, lengthOld)
} else {
Change(timestamp, title, serverName, user, cmdType, isBot = bot, isNamedBot = isNamedBot(bot, user))
Change(timestamp, titleAsID, serverName, user, cmdType, isBot = bot, isNamedBot = isNamedBot(bot, user))
}
}
}

def fetchContent(ctx: Ctx): Future[Ctx] = {
logger.info(s"About to read `extract` from Wikipedia entry with title: ${ctx.change.title}")
val encodedTitle = URLEncoder.encode(ctx.change.title, "UTF-8")
Expand Down Expand Up @@ -210,9 +205,11 @@ object SSEtoElasticsearch extends App {
val content = ctx.content
val resultRaw = new NerRequestOpenAI().run(content)

val choices: JsArray = (Json.parse(resultRaw) \ "choices").as[JsArray]
val text = (Json.parse(choices.value(0).toString()) \ "text").as[String]
val personsFound = text.split("\n").filter(_.nonEmpty).toList
case class Choice(text: String, score: Double)

val cursor = parse(resultRaw).getOrElse(Json.Null).hcursor
val choices = cursor.downField("choices").as[Seq[Choice]].toOption.getOrElse(List.empty)
val personsFound = choices.head.text.split("\n").filter(_.nonEmpty).toList
if (personsFound.isEmpty) {
Future(ctx)
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/sample/stream_actor/Total.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class Total extends Actor {

override def receive: Receive = {
case Increment(value, avg, id) =>
println(s"Received $value new measurements from turbine with id: $id - Avg wind speed is: $avg")
println(s"Received: $value new measurements from turbine with id: $id - Avg wind speed is: $avg")
total = total + value

val date = new Date()
Expand Down
11 changes: 6 additions & 5 deletions src/main/scala/sample/stream_actor/WindTurbineServer.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package sample.stream_actor

import io.circe.generic.auto._
import org.apache.pekko.Done
import org.apache.pekko.actor.{ActorSystem, Props}
import org.apache.pekko.http.scaladsl.Http
Expand All @@ -9,7 +10,6 @@ import org.apache.pekko.http.scaladsl.server.Directives._
import org.apache.pekko.stream.scaladsl.{Flow, Source}
import org.apache.pekko.util.Timeout
import org.slf4j.{Logger, LoggerFactory}
import play.api.libs.json._
import sample.stream_actor.Total.Increment

import java.time.LocalTime
Expand All @@ -36,9 +36,10 @@ object WindTurbineServer {
object Messages {

def parse(messages: immutable.Seq[String]): Seq[MeasurementsContainer] = messages.map { message =>
implicit val measurementsFormat = Json.format[Measurements]
implicit val windTurbineDataFormat = Json.format[MeasurementsContainer]
Json.parse(message).as[MeasurementsContainer]
io.circe.parser
.parse(message)
.flatMap(_.as[MeasurementsContainer])
.toOption.get
}

def ack(aString: String) = TextMessage(Source.single("Ack from server: " + aString))
Expand Down Expand Up @@ -67,7 +68,7 @@ object WindTurbineServer {
.mapAsync(1)(identity)
.groupedWithin(100, 1.second)
.map(messages => (messages.last, Messages.parse(messages)))
.map { elem => println(s"After parsing size: ${elem._2.size}"); elem }
//.wireTap(elem => println(s"After parsing size: ${elem._2.size}"))
.mapAsync(1) {
case (lastMessage: String, measurements: Seq[MeasurementsContainer]) =>
import org.apache.pekko.pattern.ask
Expand Down

0 comments on commit 6060edf

Please sign in to comment.