Skip to content

Commit

Permalink
Fix NPE
Browse files Browse the repository at this point in the history
  • Loading branch information
pbernet committed May 22, 2024
1 parent c5fcb6e commit 4be426b
Showing 1 changed file with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import org.apache.pekko.stream.scaladsl.{Keep, RestartSource, Sink, Source}
import org.slf4j.{Logger, LoggerFactory}

import java.util.Locale
import scala.concurrent.Future
import scala.concurrent.duration.*
import scala.language.postfixOps
import scala.util.{Failure, Success}
Expand Down Expand Up @@ -47,10 +46,10 @@ class Kafka2SSE(mappedPortKafka: Int = 9092) {
clientKillSwitch = backoffClient(address, port)
}

def stop(): Future[Http.HttpTerminated] = {
def stop(): Unit = {
logger.info("Stopping...")
clientKillSwitch.shutdown()
serverBinding.terminate(10.seconds)
if (serverBinding != null) serverBinding.terminate(10.seconds) else {}
}

private def createConsumerSettings(group: String): ConsumerSettings[String, String] = {
Expand Down Expand Up @@ -91,10 +90,10 @@ class Kafka2SSE(mappedPortKafka: Int = 9092) {
val bindingFuture = Http().newServerAt(address, port).bindFlow(route)
bindingFuture.onComplete {
case Success(binding) =>
logger.info("Server started, listening on: " + binding.localAddress)
logger.info(s"Server started, listening on: {}", binding.localAddress)
serverBinding = binding
case Failure(e) =>
logger.info(s"Server could not bind to $address:$port. Exception message: ${e.getMessage}")
logger.info(s"Server could not bind to: $address:$port. Exception message: ${e.getMessage}")
system.terminate()
}
}
Expand Down

0 comments on commit 4be426b

Please sign in to comment.