From ea7e6784908c6b1608b805471af1fe21cfd396f3 Mon Sep 17 00:00:00 2001 From: pbernet Date: Wed, 22 May 2024 08:42:11 +0200 Subject: [PATCH] Migrate to Java 17 --- .github/workflows/ci.yml | 6 +- build.sbt | 4 +- src/main/java/actor/DemoMessagesActor.java | 7 +- src/main/scala/akkahttp/ReverseProxy.scala | 4 +- .../alpakka/influxdb/InfluxdbReader.scala | 37 +++---- src/main/scala/sample/stream/TcpEcho.scala | 27 +++-- src/main/scala/sample/stream/TcpEchoJava.java | 104 ------------------ .../alpakka/clickhousedb/ClickhousedbIT.java | 22 ++-- .../scala/alpakka/influxdb/InfluxdbIT.java | 3 +- 9 files changed, 50 insertions(+), 164 deletions(-) delete mode 100644 src/main/scala/sample/stream/TcpEchoJava.java diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 60061759..cbf9c946 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -14,11 +14,11 @@ jobs: steps: - name: Checkout uses: actions/checkout@v4 - # https://github.com/coursier/setup-action - - name: Set up JDK 11 + # https://github.com/coursier/setup-action + - name: Set up JDK 17 uses: coursier/setup-action@v1 with: - jvm: adopt:11 + jvm: adopt:17 apps: sbtn - name: Build and Test run: sbt -v +test \ No newline at end of file diff --git a/build.sbt b/build.sbt index 66425a90..e07f6613 100644 --- a/build.sbt +++ b/build.sbt @@ -17,7 +17,7 @@ val artemisVersion = "2.33.0" val testContainersVersion = "1.19.8" val keycloakVersion = "24.0.4" val sttpVersion = "3.9.0" -val influxdbVersion = "6.10.0" +val influxdbVersion = "7.1.0" val awsClientVersion = "2.25.32" libraryDependencies ++= Seq( @@ -86,8 +86,6 @@ libraryDependencies ++= Seq( "software.amazon.awssdk" % "sqs" % awsClientVersion, - // Migrated to pekko, but new client 7.0.0 only supports Java 17 (not Java 11) - // https://github.com/influxdata/influxdb-client-java/blob/master/CHANGELOG.md "com.influxdb" %% "influxdb-client-scala" % influxdbVersion, "com.influxdb" % "flux-dsl" % influxdbVersion, "org.influxdb" % "influxdb-java" % "2.23", diff --git a/src/main/java/actor/DemoMessagesActor.java b/src/main/java/actor/DemoMessagesActor.java index 2d951a8c..7845e9c0 100644 --- a/src/main/java/actor/DemoMessagesActor.java +++ b/src/main/java/actor/DemoMessagesActor.java @@ -1,8 +1,9 @@ package actor; -import akka.Done; -import akka.actor.*; -import akka.pattern.Patterns; + +import org.apache.pekko.Done; +import org.apache.pekko.actor.*; +import org.apache.pekko.pattern.Patterns; import java.time.Duration; import java.util.concurrent.CompletionStage; diff --git a/src/main/scala/akkahttp/ReverseProxy.scala b/src/main/scala/akkahttp/ReverseProxy.scala index fb454ec8..5a1b436d 100644 --- a/src/main/scala/akkahttp/ReverseProxy.scala +++ b/src/main/scala/akkahttp/ReverseProxy.scala @@ -11,7 +11,7 @@ import org.apache.pekko.http.scaladsl.server.Directives.* import org.apache.pekko.http.scaladsl.server.Route import org.apache.pekko.http.scaladsl.settings.ServerSettings import org.apache.pekko.http.scaladsl.{Http, HttpExt} -import org.apache.pekko.pattern.CircuitBreaker +import org.apache.pekko.pattern.{CircuitBreaker, CircuitBreakerOpenException} import org.apache.pekko.stream.ThrottleMode import org.apache.pekko.stream.scaladsl.{Sink, Source} import org.slf4j.{Logger, LoggerFactory} @@ -168,7 +168,7 @@ object ReverseProxy extends App { val proxyReq = request.withUri(uri(target)).withHeaders(headers(target)) circuitBreaker.withCircuitBreaker(http.singleRequest(proxyReq)) }.recover { - case _: akka.pattern.CircuitBreakerOpenException => BadGateway(id, "Circuit breaker opened") + case _: CircuitBreakerOpenException => BadGateway(id, "Circuit breaker opened") case _: TimeoutException => GatewayTimeout(id) case e => BadGateway(id, e.getMessage) } diff --git a/src/main/scala/alpakka/influxdb/InfluxdbReader.scala b/src/main/scala/alpakka/influxdb/InfluxdbReader.scala index 4d9116ab..25c48d01 100644 --- a/src/main/scala/alpakka/influxdb/InfluxdbReader.scala +++ b/src/main/scala/alpakka/influxdb/InfluxdbReader.scala @@ -7,13 +7,14 @@ import com.influxdb.query.FluxTable import com.influxdb.query.dsl.Flux import com.influxdb.query.dsl.functions.restriction.Restrictions import org.apache.pekko.actor.ActorSystem -import org.apache.pekko.stream.Supervision +import org.apache.pekko.stream.scaladsl.Sink +import org.apache.pekko.stream.{ActorAttributes, Supervision} import org.slf4j.{Logger, LoggerFactory} import java.time.temporal.ChronoUnit import java.util -import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration.DurationInt +import scala.concurrent.{Await, ExecutionContextExecutor} import scala.util.control.NonFatal /** @@ -54,25 +55,23 @@ class InfluxdbReader(baseURL: String, token: String, org: String = "testorg", bu |> range(start: -interval) """ - // TODO Activate, when "com.influxdb" %% "influxdb-client-scala" is available for pekko - // def source() = influxdbClientScala - // .getQueryScalaApi() - // .query(query) + def source() = influxdbClientScala + .getQueryScalaApi() + .query(query) - // TODO Activate, when "com.influxdb" %% "influxdb-client-scala" is available for pekko def getQuerySync(mem: String) = { - // logger.info(s"Query raw for measurements of type: $mem") - // val result = source() - // .filter(fluxRecord => fluxRecord.getMeasurement().equals(mem) ) - // .wireTap(fluxRecord => { - // val measurement = fluxRecord.getMeasurement() - // val value = fluxRecord.getValue() - // logger.debug(s"About to process measurement: $measurement with value: $value") - // }) - // .withAttributes(ActorAttributes.supervisionStrategy(deciderFlow)) - // .runWith(Sink.seq) - // - // Await.result(result, 10.seconds) + logger.info(s"Query raw for measurements of type: $mem") + val result = source() + .filter(fluxRecord => fluxRecord.getMeasurement().equals(mem)) + .wireTap(fluxRecord => { + val measurement = fluxRecord.getMeasurement() + val value = fluxRecord.getValue() + logger.debug(s"About to process measurement: $measurement with value: $value") + }) + .withAttributes(ActorAttributes.supervisionStrategy(deciderFlow)) + .runWith(Sink.seq) + + Await.result(result, 10.seconds) } def fluxQueryCount(mem: String): Long = { diff --git a/src/main/scala/sample/stream/TcpEcho.scala b/src/main/scala/sample/stream/TcpEcho.scala index 4d92141a..6055e68e 100644 --- a/src/main/scala/sample/stream/TcpEcho.scala +++ b/src/main/scala/sample/stream/TcpEcho.scala @@ -14,34 +14,31 @@ import scala.sys.process.* import scala.util.{Failure, Success} /** - * Inspired by: - * https://doc.akka.io/docs/akka/current/stream/stream-io.html?language=scala - * - * Use without parameters to start server and 100 parallel clients. + * TCP echo client server round trip + * Use without parameters to start server and 100 parallel clients * * Use parameters `server 127.0.0.1 6000` to start server listening on port 6000 * - * Use parameters `client 127.0.0.1 6000` to start one client connecting to - * server on 127.0.0.1:6000 + * Use parameters `client 127.0.0.1 6000` to start one client * * Run cmd line client: * echo -n "Hello World" | nc 127.0.0.1 6000 * + * Doc: + * https://pekko.apache.org/docs/pekko/current/stream/stream-io.html?language=scala */ object TcpEcho extends App { val logger: Logger = LoggerFactory.getLogger(this.getClass) val systemServer = ActorSystem("TcpEchoServer") val systemClient = ActorSystem("TcpEchoClient") - var serverBinding: Future[Tcp.ServerBinding] = _ - if (args.isEmpty) { val (host, port) = ("127.0.0.1", 6000) - serverBinding = server(systemServer, host, port) + server(systemServer, host, port) + // Issue: https://github.com/akka/akka/issues/29842 checkResources() - // Issue: - // https://github.com/akka/akka/issues/29842 + val maxClients = 100 (1 to maxClients).par.foreach(each => client(each, systemClient, host, port)) } else { @@ -49,7 +46,7 @@ object TcpEcho extends App { if (args.length == 3) (args(1), args(2).toInt) else ("127.0.0.1", 6000) if (args(0) == "server") { - serverBinding = server(systemServer, host, port) + server(systemServer, host, port) } else if (args(0) == "client") { client(1, systemClient, host, port) } @@ -105,12 +102,14 @@ object TcpEcho extends App { // We want "halfClose behavior" on the client side. Doc: // https://github.com/akka/akka/issues/22163 - val connection: Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]] = Tcp().outgoingConnection(remoteAddress = InetSocketAddress.createUnresolved(host, port), halfClose = true) + val connection: Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]] = + Tcp().outgoingConnection(remoteAddress = InetSocketAddress.createUnresolved(host, port), halfClose = true) val testInput = ('a' to 'z').map(ByteString(_)) ++ Seq(ByteString("BYE")) + logger.info(s"Client: $id sending: ${testInput.length} bytes") val restartSettings = RestartSettings(1.second, 10.seconds, 0.2).withMaxRestarts(10, 1.minute) val restartSource = RestartSource.onFailuresWithBackoff(restartSettings) { () => Source(testInput).via(connection) } - val closed = restartSource.runForeach(each => logger.info(s"Client: $id received echo: ${each.utf8String}")) + val closed = restartSource.runForeach(each => logger.info(s"Client: $id received: ${each.utf8String}")) closed.onComplete(each => logger.info(s"Client: $id closed: $each")) } diff --git a/src/main/scala/sample/stream/TcpEchoJava.java b/src/main/scala/sample/stream/TcpEchoJava.java deleted file mode 100644 index 46d79117..00000000 --- a/src/main/scala/sample/stream/TcpEchoJava.java +++ /dev/null @@ -1,104 +0,0 @@ -package sample.stream; - -import akka.Done; -import akka.NotUsed; -import akka.actor.ActorSystem; -import akka.stream.Materializer; -import akka.stream.javadsl.Flow; -import akka.stream.javadsl.Sink; -import akka.stream.javadsl.Source; -import akka.stream.javadsl.Tcp; -import akka.stream.javadsl.Tcp.IncomingConnection; -import akka.stream.javadsl.Tcp.ServerBinding; -import akka.util.ByteString; - -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletionStage; -import java.util.stream.IntStream; - -public class TcpEchoJava { - - /** - * Use without parameters to start both server and 10 clients. - * See also: [[sample.stream.TcpEcho]] - * - *

- * Use parameters `server 0.0.0.0 6000` to start server listening on port - * 6000 - *

- * Use parameters `client 127.0.0.1 6000` to start client connecting to server - * on 127.0.0.1:6000 - *

- */ - public static void main(String[] args) { - ActorSystem systemServer = ActorSystem.create("TcpEchoJavaServer"); - ActorSystem systemClient = ActorSystem.create("TcpEchoJavaClient"); - if (args.length == 0) { - InetSocketAddress serverAddress = new InetSocketAddress("127.0.0.1", 6000); - server(systemServer, serverAddress); - IntStream.range(1, 1000).parallel().forEach(each -> client(systemClient, serverAddress)); - } else { - InetSocketAddress serverAddress; - if (args.length == 3) { - serverAddress = new InetSocketAddress(args[1], Integer.parseInt(args[2])); - } else { - serverAddress = new InetSocketAddress("127.0.0.1", 6000); - } - if (args[0].equals("server")) { - server(systemServer, serverAddress); - } else if (args[0].equals("client")) { - client(systemClient, serverAddress); - } - } - } - - private static void server(ActorSystem system, InetSocketAddress serverAddress) { - final Materializer materializer = Materializer.createMaterializer(system); - - final Sink> handler = Sink.foreach(conn -> { - System.out.println("Client connected from: " + conn.remoteAddress()); - conn.handleWith(Flow.create(), materializer); - }); - - - final CompletionStage bindingFuture = - Tcp.get(system).bind(serverAddress.getHostString(), serverAddress.getPort()).to(handler).run(materializer); - - bindingFuture.handle((ServerBinding binding, Throwable exception) -> { - if (binding != null) { - System.out.println("Server started, listening on: " + binding.localAddress()); - } else { - System.err.println("Server could not bind to " + serverAddress + " : " + exception.getMessage()); - system.terminate(); - } - return NotUsed.getInstance(); - }); - - } - - private static void client(ActorSystem system, InetSocketAddress serverAddress) { - final Materializer materializer = Materializer.createMaterializer(system); - - final List testInput = new ArrayList<>(); - for (char c = 'a'; c <= 'z'; c++) { - testInput.add(ByteString.fromString(String.valueOf(c))); - } - - Source responseStream = - Source.from(testInput).via(Tcp.get(system).outgoingConnection(serverAddress.getHostString(), serverAddress.getPort())); - - CompletionStage result = responseStream.runFold( - ByteString.emptyByteString(), ByteString::concat, materializer); - - result.handle((success, failure) -> { - if (failure != null) { - System.err.println("Failure: " + failure.getMessage()); - } else { - System.out.println("Result: " + success.utf8String()); - } - return NotUsed.getInstance(); - }); - } -} diff --git a/src/test/scala/alpakka/clickhousedb/ClickhousedbIT.java b/src/test/scala/alpakka/clickhousedb/ClickhousedbIT.java index 884eacdd..353d0618 100644 --- a/src/test/scala/alpakka/clickhousedb/ClickhousedbIT.java +++ b/src/test/scala/alpakka/clickhousedb/ClickhousedbIT.java @@ -171,21 +171,15 @@ protected DataSource getDataSource(JdbcDatabaseContainer container) { } protected void createTable() throws SQLException { - // Since we want to be Java 11 source compatible // Doc: https://clickhouse.com/docs/en/engines/table-engines - String newLine = System.lineSeparator(); - String createStatementTextBlock = - "CREATE TABLE test.my_table" - + newLine - + "(" - + newLine - + "`myfloat_nullable` Nullable(Float32)," - + newLine - + "`mystr` String," - + newLine - + "`myint_id` Int32" - + newLine - + ") ENGINE = Log"; + String createStatementTextBlock = """ + CREATE TABLE test.my_table + ( + `myfloat_nullable` Nullable(Float32), + `mystr` String, + `myint_id` Int32 + ) ENGINE = Log + """; LOGGER.info(createStatementTextBlock); diff --git a/src/test/scala/alpakka/influxdb/InfluxdbIT.java b/src/test/scala/alpakka/influxdb/InfluxdbIT.java index ad9b73a1..4efa5e62 100644 --- a/src/test/scala/alpakka/influxdb/InfluxdbIT.java +++ b/src/test/scala/alpakka/influxdb/InfluxdbIT.java @@ -74,8 +74,7 @@ void testWriteAndRead() { .collect(Collectors.toList()); assertThat(CompletableFuture.allOf(futList.toArray(new CompletableFuture[futList.size()]))).succeedsWithin(5 * maxClients, TimeUnit.SECONDS); - // TODO Activate, when "com.influxdb" %% "influxdb-client-scala" is available for pekko - //assertThat(influxDBReader.getQuerySync("testMem").length()).isEqualTo(nPoints * maxClients); + assertThat(influxDBReader.getQuerySync("testMem").length()).isEqualTo(nPoints * maxClients); assertThat(influxDBReader.fluxQueryCount("testMem")).isEqualTo(nPoints * maxClients); assertThat(new LogFileScanner("logs/application.log").run(1, 2, searchAfterPattern, "ERROR").length()).isZero(); }