Skip to content

Commit

Permalink
Rewrite with avro4s
Browse files Browse the repository at this point in the history
  • Loading branch information
pbernet committed Jul 13, 2024
1 parent a8b3bc3 commit 743c013
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 182 deletions.
5 changes: 2 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ libraryDependencies ++= Seq(
// JSON (un)marshalling support for pekko-http
"org.apache.pekko" %% "pekko-http-spray-json" % pekkoHTTPVersion,
"org.apache.pekko" %% "pekko-http-xml" % pekkoHTTPVersion,

// JSON (un)marshalling in Java examples
"org.json" % "json" % "20240303",

Expand Down Expand Up @@ -104,8 +104,7 @@ libraryDependencies ++= Seq(
"org.apache.httpcomponents.core5" % "httpcore5" % "5.2.4",
"commons-io" % "commons-io" % "2.16.1",
"org.apache.commons" % "commons-lang3" % "3.12.0",
"com.twitter" %% "bijection-avro" % "0.9.7",

"com.sksamuel.avro4s" %% "avro4s-core" % "4.1.2",

"org.apache.camel" % "camel-core" % "3.20.2",
"org.apache.camel" % "camel-reactive-streams" % "3.20.2",
Expand Down
3 changes: 3 additions & 0 deletions src/main/scala/alpakka/kafka/avro/AvroRecord.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package alpakka.kafka.avro

case class AvroRecord(str1: String, str2: String, int1: Int)
60 changes: 0 additions & 60 deletions src/main/scala/alpakka/kafka/avro/AvroSerializer.java

This file was deleted.

53 changes: 0 additions & 53 deletions src/main/scala/alpakka/kafka/avro/SimpleAvroConsumer.java

This file was deleted.

46 changes: 46 additions & 0 deletions src/main/scala/alpakka/kafka/avro/SimpleAvroConsumer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package alpakka.kafka.avro

import com.sksamuel.avro4s.*
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.common.serialization.StringDeserializer

import java.time.Duration
import java.util.Properties
import scala.jdk.CollectionConverters.*

/**
* Not pekko streams related
*
* Prerequisite:
* Run [[alpakka.env.KafkaServerEmbedded]]
* Run [[alpakka.kafka.avro.SimpleAvroProducer]]
*/
object SimpleAvroConsumer extends App {
val props = new Properties()
props.put("bootstrap.servers", "localhost:29092")
props.put("group.id", "mygroup")
props.put("key.deserializer", classOf[StringDeserializer].getName)
props.put("value.deserializer", classOf[AvroDeserializer].getName)

val consumer = new KafkaConsumer[String, AvroRecord](props)
consumer.subscribe(List("avro-topic").asJava)

var running = true
while (running) {
val records = consumer.poll(Duration.ofMillis(100))
for (record: ConsumerRecord[String, AvroRecord] <- records.asScala) {
val avroRecord = record.value()
println(s"Receiving record: str1=${avroRecord.str1}, str2=${avroRecord.str2}, int1=${avroRecord.int1}")
}
}
}

class AvroDeserializer extends org.apache.kafka.common.serialization.Deserializer[AvroRecord] {
override def deserialize(topic: String, data: Array[Byte]): AvroRecord = {
val avroSchema = AvroSchema[AvroRecord]
val avroInputStream = AvroInputStream.binary[AvroRecord].from(data).build(avroSchema)
val result = avroInputStream.iterator.next()
avroInputStream.close()
result
}
}
57 changes: 0 additions & 57 deletions src/main/scala/alpakka/kafka/avro/SimpleAvroProducer.java

This file was deleted.

42 changes: 42 additions & 0 deletions src/main/scala/alpakka/kafka/avro/SimpleAvroProducer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package alpakka.kafka.avro

import com.sksamuel.avro4s.*
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

import java.util.Properties

/**
* Not pekko streams related
*
* Prerequisite:
* Run [[alpakka.env.KafkaServerEmbedded]]
*/
object SimpleAvroProducer extends App {
val props = new Properties()
props.put("bootstrap.servers", "localhost:29092")
props.put("key.serializer", classOf[StringSerializer].getName)
props.put("value.serializer", classOf[AvroSerializer].getName)

val producer = new KafkaProducer[String, AvroRecord](props)

try for (i <- 0 until 100) {
val avroRecord = AvroRecord(s"Str 1-$i", s"Str 2-$i", i)
println(s"Sending record: $avroRecord")

val record = new ProducerRecord[String, AvroRecord]("avro-topic", avroRecord)
producer.send(record)

Thread.sleep(100)
} finally producer.close()
}

class AvroSerializer extends org.apache.kafka.common.serialization.Serializer[AvroRecord] {
override def serialize(topic: String, data: AvroRecord): Array[Byte] = {
val baos = new java.io.ByteArrayOutputStream()
val avroOutputStream = AvroOutputStream.binary[AvroRecord].to(baos).build()
avroOutputStream.write(data)
avroOutputStream.close()
baos.toByteArray
}
}
9 changes: 0 additions & 9 deletions src/main/scala/alpakka/kafka/avro/record.avsc

This file was deleted.

0 comments on commit 743c013

Please sign in to comment.