Skip to content

Commit

Permalink
some syntax refactoring and idiomatic scala
Browse files Browse the repository at this point in the history
  • Loading branch information
anicolaspp committed Dec 11, 2018
1 parent f7eec5a commit 670e68f
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 49 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<version>1.0.0</version>

<properties>
<scala.version>2.12.2</scala.version>
<scala.version>2.12.8</scala.version>
</properties>

<repositories>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.github.anicolaspp.rabbites.mapres

import java.util.Properties

import com.github.anicolaspp.rabbites.syntax.Predef._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

import scala.util.Try
Expand All @@ -16,9 +17,9 @@ object Producer {

private val TOPIC = streamName + ":" + topic

override def produce(message: String): Try[RecordMetadata] =
override def produce(message: String): Try[RecordMetadata] =
new ProducerRecord[String, String](TOPIC, message).sendWith(producer)

private lazy val producer = {
val props = new Properties()
props.setProperty("batch.size", "16384")
Expand All @@ -29,8 +30,4 @@ object Producer {
new KafkaProducer[String, String](props)
}
}

implicit class RichProducerRecord[A, B](record: ProducerRecord[A, B]) {
def sendWith(producer: KafkaProducer[A, B]) = Try { producer.send(record).get() }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.github.anicolaspp.rabbites.mq
object IdCounter {
private var currentId = 0

def next() = synchronized {
def next(): Int = synchronized {
val nextId = currentId

currentId += 1
Expand Down

This file was deleted.

27 changes: 22 additions & 5 deletions src/main/scala/com/github/anicolaspp/rabbites/mq/Receiver.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.github.anicolaspp.rabbites.mq

import com.github.anicolaspp.rabbites.mapres.Producer
import com.rabbitmq.client.{CancelCallback, Channel}
import com.rabbitmq.client.{Channel, DeliverCallback, Delivery}

import scala.util.{Failure, Success}

sealed trait Receiver {
def id(): Int
Expand All @@ -24,12 +26,27 @@ object Receiver {

channel.queueDeclare(queueName, false, false, false, null)

channel.basicConsume(queueName, false, deliveryProcessing(producer), new CancelCallback {
override def handle(consumerTag: String): Unit = {}
})
channel.basicConsume(queueName, false, callBack(producer, channel, id()), (_: String) => {})
}

private def callBack(producer: Producer, channel: Channel, id: Int): DeliverCallback = (_: String, delivery: Delivery) => {
val message = new String(delivery.getBody, "UTF-8")

println(s" [x] [$id] Received $message")

producer.produce(message) match {
case Success(metadata) => {
println(s" [x] [$id] TOPIC:${metadata.topic()}; OFFSET: ${metadata.offset()}")

private def deliveryProcessing(producer: Producer) = RabbitMessageProcessor(producer, channel, id())
channel.basicAck(delivery.getEnvelope.getDeliveryTag, false)
}
case Failure(exception) => {
println(s" [x] [$id] Error $exception")

channel.basicReject(delivery.getEnvelope.getDeliveryTag, true)
}
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ package com.github.anicolaspp.rabbites.mq
import com.github.anicolaspp.rabbites.mapres.Producer
import com.rabbitmq.client.Channel

sealed trait ReceiverPool {
trait ReceiverPool {
def start(receivers: Int): Unit
}

object ReceiverPool {
def apply(channel: Channel, queueName: String, producerSupplier: () => Producer): ReceiverPool = new ReceiverPool {
override def start(receivers: Int): Unit =
(1 to receivers).foreach { _ => Receiver(channel, queueName).runWith(producerSupplier.apply()) }
}
def apply(channel: Channel, queueName: String, producerSupplier: () => Producer): ReceiverPool =
(receivers: Int) => (1 to receivers).foreach { _ => Receiver(channel, queueName).runWith(producerSupplier.apply()) }
}
13 changes: 13 additions & 0 deletions src/main/scala/com/github/anicolaspp/rabbites/syntax/Predef.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.github.anicolaspp.rabbites.syntax

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

import scala.util.Try

object Predef {

implicit class RichProducerRecord[A, B](record: ProducerRecord[A, B]) {
def sendWith(producer: KafkaProducer[A, B]) = Try { producer.send(record).get() }
}

}

0 comments on commit 670e68f

Please sign in to comment.